Source code for smc.core.engine

#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.

from collections import namedtuple
import pytz

from smc.base.util import save_to_file
from smc.core.advanced_settings import LogModeration
from smc.core.lldp import LLDPProfile
from smc.core.policy import AutomaticRulesSettings
from smc.elements.helpers import domain_helper, location_helper
from smc.base.model import Element, SubElement, lookup_class, ElementCreator, \
    ElementRef, ElementCache

from smc.api.exceptions import (
    UnsupportedEngineFeature,
    UnsupportedInterfaceType,
    EngineCommandFailed,
    SMCConnectionError, CreateElementFailed, UpdateElementFailed, CertificateExportError,
    CertificateImportError
)
from smc.core.node import Node
from smc.core.resource import Snapshot, PendingChanges
from smc.core.interfaces import InterfaceOptions, PhysicalInterface
from smc.core.collection import (
    InterfaceCollection,
    LoopbackCollection,
    PhysicalInterfaceCollection,
    TunnelInterfaceCollection,
    VPNBrokerInterfaceCollection,
    VirtualPhysicalInterfaceCollection,
    SwitchInterfaceCollection,
)
from smc.administration.tasks import Task
from smc.administration.certificates.tls_common import pem_as_string
from smc.elements.group import ConnectionSynchronizationGroup
from smc.elements.other import prepare_block_list, prepare_blacklist
from smc.elements.network import Alias
from smc.vpn.elements import VPNSite, LinkUsageProfile
from smc.routing.bgp import DynamicRouting
from smc.routing.ospf import OSPFProfile
from smc.core.route import Antispoofing, Routing, Route, PolicyRoute
from smc.core.session_monitoring import SessionMonitoringResult
from smc.core.contact_address import ContactAddressCollection
from smc.core.general import DNSRelay, Layer2Settings, DefaultNAT, SNMP, RankedDNSAddress, \
    NTPSettings
from smc.core.addon import (
    AntiVirus,
    FileReputation,
    SidewinderProxy,
    UrlFiltering,
    Sandbox,
    TLSInspection,
    ClientInspection,
    ZTNAConnector,
    EndpointIntegration
)
from smc.elements.servers import LogServer
from smc.base.collection import create_collection, sub_collection
from smc.base.util import element_resolver
from smc.administration.access_rights import AccessControlList, Permission, \
    GrantedElementPermissions
from smc.base.decorators import cacheable_resource
from smc.administration.certificates.vpn import GatewayCertificate, GatewayCertificateRequest
from smc.base.structs import BaseIterable, NestedDict
from smc.elements.profiles import SNMPAgent
from smc.elements.ssm import SSHKnownHostsLists
from smc.compat import is_smc_version_less_than_or_equal, is_api_version_less_than_or_equal, \
    min_smc_version, is_api_version_less_than, is_smc_version_equal


[docs] class LinkUsageExceptionRules(NestedDict): def __init__(self, destinations=None, services=None, sources=None, isp_link_ref=None, comment=None): """ LinkUsageExceptionRules Set to engine if Link usage profile is set to engine in routing :param list destinations: list of destinations :param list services: list of services :param list sources: list of sources :param object isp_link: object type can be static netlink, dynamic netlink, Outbound multilink or Server Pool :param str comment: simple comment Example: :raises Errors :return: None """ if sources == "any": sources = {"any": True} if destinations == "any": destinations = {"any": True} if services == "any": services = {"any": True} if type(sources) is list: value_list = [] for val in sources: v = element_resolver(val) value_list.append(v) sources_json = {} sources_json.update(src=value_list) sources = sources_json if type(services) is list: value_list = [] for val in services: v = element_resolver(val) value_list.append(v) services_json = {} services_json.update(service=value_list) services = services_json if type(destinations) is list: value_list = [] for val in destinations: v = element_resolver(val) value_list.append(v) destinations_json = {} destinations_json.update(dst=value_list) destinations = destinations_json dc = dict( sources=sources, destinations=destinations, services=services, isp_link_ref=element_resolver(isp_link_ref), comment=comment ) super(LinkUsageExceptionRules, self).__init__(data=dc) @property def isp_link_ref(self): """ isp_link :rtype: isp_link """ return ( self.get("isp_link_ref") ) @isp_link_ref.setter def isp_link_ref(self, value): self.update(isp_link_ref=value) @property def comment(self): """ comment. :rtype: str """ return self.get("comment") @comment.setter def comment(self, value): self.update(comment=value) @property def destinations(self): return self.get("destinations") @destinations.setter def destinations(self, value): if value == "any": value = {"any": "true"} self.update(destinations=value) else: value_list = [] for val in value: v = element_resolver(val) value_list.append(v) destinations_json = {} destinations_json(dst=value_list) self.update(destinations=destinations_json) @property def sources(self): return self.get("sources") @sources.setter def sources(self, value): if value == "any": value = {"any": "true"} self.update(sources=value) @property def services(self): return self.get("services") @services.setter def services(self, value): if value == "any": value = {"any": "true"} self.update(services=value)
[docs] class Engine(Element): """ An engine is the top level representation of a firewall, IPS or virtualized software. Engine can be referenced directly and will be loaded when attributes are accessed:: >>> from smc.core.engine import Engine >>> engine = Engine('testfw') >>> print(engine.href) http://1.1.1.1:8082/6.1/elements/single_fw/39550 Generically search for engines of all types:: >>> list(Engine.objects.all()) [Layer3Firewall(name=i-06145fc6c59a04335 (us-east-2a)), FirewallCluster(name=sg_vm), Layer3VirtualEngine(name=ve-5), MasterEngine(name=master-eng)] Or only search for specific engine types:: >>> from smc.core.engines import Layer3Firewall >>> list(Layer3Firewall.objects.all()) [Layer3Firewall(name=i-06145fc6c59a04335 (us-east-2a))] Engine types are defined in :class:`smc.core.engines`. """ typeof = "engine_clusters" @classmethod def _create( cls, name, node_type, physical_interfaces, nodes=1, nodes_definition=[], loopback_ndi=None, log_server_ref=None, domain_server_address=None, enable_antivirus=False, enable_gti=False, sidewinder_proxy_enabled=False, known_host_lists=[], default_nat=False, location_ref=None, enable_ospf=None, ospf_profile=None, snmp_agent=None, comment=None, ntp_settings=None, timezone=None, lldp_profile=None, link_usage_profile=None, discard_quic_if_cant_inspect=True, **kw ): """ Create will return the engine configuration as a dict that is a representation of the engine. The creating class will also add engine specific requirements before constructing the request and sending to SMC (which will serialize the dict to json). :param name: name of engine :param str node_type: comes from class attribute of engine type :param dict physical_interfaces: physical interface list of dict :param int nodes: number of nodes for engine :param list nodes_definition: list of definition for each node :param str log_server_ref: href of log server :param list domain_server_address: dns addresses :param NTPSettings ntp_settings: ntp settings :param LLDPProfile lldp_profile: LLDP Profile represents a set of attributes used for configuring LLDP :param bool discard_quic_if_cant_inspect: (optional) discard or allow QUIC if inspection is not possible """ node_list = [] for nodeid in range(1, nodes + 1): # start at nodeid=1 node_name = name + " node " + str(nodeid) if nodes_definition and nodes_definition.__len__() > 0: node_definition = nodes_definition[nodeid-1] node_name = node_definition.pop("name", node_name) comment = node_definition.pop("comment", None) disable = node_definition.pop("disable", False) settings = node_definition.pop("external_pki_certificate_settings", None) node_list.append(Node._create(node_name, node_type, nodeid, loopback_ndi, settings, disable, comment)) else: node_list.append(Node._create(node_name, node_type, nodeid, loopback_ndi)) domain_server_list = [] if domain_server_address: for num, server in enumerate(domain_server_address): try: domain_server = {"rank": num, "ne_ref": server.href} except AttributeError: domain_server = {"rank": num, "value": server} domain_server_list.append(domain_server) # Set log server reference, if not explicitly provided if not log_server_ref and node_type != "virtual_fw_node": log_server_ref = LogServer.objects.first().href base_cfg = { "name": name, "nodes": node_list, "domain_server_address": domain_server_list, "log_server_ref": log_server_ref, "physicalInterfaces": physical_interfaces, } if enable_antivirus: antivirus = { "antivirus": { "antivirus_enabled": True, "antivirus_update": "daily", "virus_log_level": "stored", "virus_mirror": "update.nai.com/Products/CommonUpdater", } } base_cfg.update(antivirus) if enable_gti: # gti_settings element replaced since smc 6.6 and api 6.6 # and since smc 6.7 and api 6.5 if is_smc_version_less_than_or_equal("6.6") \ and is_api_version_less_than_or_equal("6.5"): gti = {"gti_settings": {"file_reputation_context": "gti_cloud_only"}} else: gti = {"file_reputation_settings": {"file_reputation_context": "gti_cloud_only"}} base_cfg.update(gti) if sidewinder_proxy_enabled: base_cfg.update(sidewinder_proxy_enabled=True) # We only want to set known host lists when SSM is enabled if known_host_lists: if isinstance(known_host_lists, list): base_cfg.update(known_host_lists_ref=known_host_lists) else: split_known_host_lists = known_host_lists.split(",") base_cfg.update(known_host_lists_ref=split_known_host_lists) if default_nat: base_cfg.update(default_nat=True) if location_ref: base_cfg.update(location_ref=location_helper(location_ref) if location_ref else None) if snmp_agent: snmp_agent_ref = SNMPAgent(snmp_agent.pop("snmp_agent_ref")).href base_cfg.update(snmp_agent_ref=snmp_agent_ref, **snmp_agent) if enable_ospf: if not ospf_profile: # get default profile ospf_profile = OSPFProfile("Default OSPFv2 Profile").href ospf = { "dynamic_routing": {"ospfv2": {"enabled": True, "ospfv2_profile_ref": ospf_profile}} } base_cfg.update(ospf) if link_usage_profile: if isinstance(LinkUsageProfile, link_usage_profile): base_cfg.update(link_usage_profile_ref=link_usage_profile.href) # is already a href else: base_cfg.update(link_usage_profile_ref=link_usage_profile) if ntp_settings is not None: if isinstance(ntp_settings, dict): base_cfg.update(ntp_settings=ntp_settings) else: base_cfg.update(ntp_settings.data) if timezone is not None: # check timezone is valid if timezone in pytz.all_timezones: timezone = {"timezone": timezone} base_cfg.update(timezone) else: raise CreateElementFailed( "Timezone is invalid:" + timezone ) if min_smc_version("6.6") and lldp_profile is not None: if isinstance(lldp_profile, LLDPProfile): base_cfg.update(lldp_profile_ref=lldp_profile.href) # is already a href else: base_cfg.update(lldp_profile_ref=lldp_profile) if min_smc_version("7.0"): if discard_quic_if_cant_inspect: discard_quic = {"discard_quic_if_cant_inspect": "true"} else: discard_quic = {"discard_quic_if_cant_inspect": "false"} base_cfg.update(discard_quic) base_cfg.update(kw, comment=comment) # Add rest of kwargs return base_cfg @property def type(self): if not self._meta: self.href return self._meta.type @property def version(self): """ Version of this engine. Can be none if the engine has not been initialized yet. :rtype: str or None """ return getattr(self, "engine_version", None) @property def installed_policy(self): """ Return the name of the policy installed on this engine. If no policy, None will be returned. :rtype: str or None """ for node in self.nodes: return node.health.installed_policy
[docs] def rename(self, name): """ Rename the firewall engine, nodes, and internal gateway (VPN gw) :return: None """ for node in self.nodes: node.rename(name) self.update(name=name) self.vpn.rename(name)
@property def log_server(self): """ Log server for this engine. :return: The specified log server :rtype: LogServer """ return Element.from_href(self.log_server_ref) @log_server.setter def log_server(self, value): self.data.update(log_server_ref=location_helper(value)) @property def location(self): """ The location for this engine. May be None if no specific location has been assigned. :param value: location to assign engine. Can be name, str href, or Location element. If name, it will be automatically created if a Location with the same name doesn't exist. :raises UpdateElementFailed: failure to update element :return: Location element or None """ location = Element.from_href(self.location_ref) if location and location.name == "Default": return None return location @location.setter def location(self, value): self.data.update(location_ref=location_helper(value)) @property def geolocation(self): """ Return the geolocation for the given engine. This attribute requires at least SMC version >= 6.5.x. If no geolocation is assigned or the SMC is not a correct version this will return None. If setting a new geolocation, call update() after modification. Example:: >>> from smc.elements.other import Geolocation >>> engine = Engine('azure') >>> geo = Geolocation.create(name='MyGeo', latitude='44.97997', longitude='-93.26384') >>> geo Geolocation(name=MyGeo) >>> engine.geolocation = geo >>> engine.update() >>> engine.geolocation Geolocation(name=MyGeo) :param Geolocation value: Geolocation to assign engine. Can be str href or type Geolocation element. :rtype: Geolocation or None """ return self.from_href(getattr(self, "geolocation_ref", None)) @geolocation.setter def geolocation(self, value): self.data.update(geolocation_ref=element_resolver(value)) @property def default_nat(self): """ Configure default nat on the engine. Default NAT provides automatic NAT without the requirement to add specific NAT rules. This is a more common configuration for outbound traffic. Inbound traffic will still require specific NAT rules for redirection. :rtype: DefaultNAT """ if "default_nat" in self.data: return DefaultNAT(self) raise UnsupportedEngineFeature("This engine type does not support default NAT.") @property def dns(self): """ Current DNS entries for the engine. Add and remove DNS entries. This resource is iterable and yields instances of :class:`smc.core.addon.DNSEntry`. Example of adding entries:: >>> from smc.elements.servers import DNSServer >>> server = DNSServer.create(name='mydnsserver', address='10.0.0.1') >>> engine.dns.add(['8.8.8.8', server]) >>> engine.update() 'http://172.18.1.151:8082/6.4/elements/single_fw/948' >>> list(engine.dns) [DNSEntry(rank=0,value=8.8.8.8,ne_ref=None), DNSEntry(rank=1,value=None,ne_ref=DNSServer(name=mydnsserver))] :rtype: RankedDNSAddress """ return RankedDNSAddress(self.data.get("domain_server_address", [])) @property def snmp(self): """ SNMP engine settings. SNMP is supported on all engine types, however can be enabled only on NDI interfaces (interfaces that have assigned addresses). :rtype: SNMP """ if not self.type.startswith("virtual"): return SNMP(self) raise UnsupportedEngineFeature( "SNMP is not supported directly on this engine type. If this " "is a virtual engine, SNMP is configured on the master engine." ) @property def ntp_settings(self): """ NTP settings definition for the engine :rtype: NTPSettings """ return NTPSettings(self) @property def automatic_rules_settings(self): """ Represents the container for all automatic rules settings for a cluster. Example of using automatic rules settings: >>> engine = Engine("testme") >>> automatic_rules_settings=engine.automatic_rules_settings >>> update_automatic_rules_settings(allow_auth_traffic=False, allow_no_nat=False) >>> engine.update() >>> engine.automatic_rules_settings.allow_auth_traffic False >>> engine.automatic_rules_settings.allow_listening_interfaces_to_dns_relay_port True :rtype: AutomaticRulesSettings """ return AutomaticRulesSettings(self) @property def connection_timeout(self): """ This is definition of timeout by protocol or by TCP connection state. You can define general timeouts for removing idle connections from the state table, including non-TCP communications that are handled like connections. The timeout prevents wasting engine resources on storing information about abandoned connections. Timeouts are a normal way to clear traffic information with protocols that have no closing mechanism.Timeouts do not affect active connections. The connections are kept in the state table as long as the interval of packets within a connection is shorter than the timeouts set. Example of using idle time out settings: >>> engine = Engine("testme") >>> connection_timeout=engine.connection_timeout >>> connection_timeout.add('tcp_syn_seen',120) >>> engine.connection_timeout.data {'connection_timeout': [{'protocol': 'tcp', 'timeout': 1800}, {'protocol': 'udp', 'timeout': 50}, {'protocol': 'icmp', 'timeout': 5}, {'protocol': 'other', 'timeout': 180}, {'protocol': 'tcp_syn_seen', 'timeout': 120}]} >>> engine.update() >>> engine.connection_timeout.data >>> connection_timeout.remove('tcp_syn_seen') {'connection_timeout': [{'protocol': 'tcp', 'timeout': 1800}, {'protocol': 'udp', 'timeout': 50}, {'protocol': 'icmp', 'timeout': 5}, {'protocol': 'other', 'timeout': 180}]} :rtype: IdleTimeout """ return IdleTimeout(self) @property def local_log_storage(self): """ Local Log Storage Settings for not virtual engines. Example of using local log storage settings: >>> engine = Engine("testme") >>> local_log_storage=engine.local_log_storage >>> local_log_storage.local_log_storage_activated True >>> local_log_storage.lls_max_time 10 >>> local_log_storage.update(lls=20_max_time) >>> engine.update() >>> local_log_storage=engine.local_log_storage >>> local_log_storage.lls_max_time 20 :rtype LocalLogStorageSettings """ if self.type == "virtual_fw": raise UnsupportedEngineFeature( "Local Log Storage settings are not supported on virtual engines.") return LocalLogStorageSettings(self) @property def log_moderation(self): """ This is the definition of Log Compression for the engine or for an interface. You can also configure Log Compression to save resources on the engine. By default, each generated Antispoofing and Discard log entry is logged separately and displayed as a separate entry in the Logs view. Log Compression allows you to define the maximum number of separately logged entries.When the defined limit is reached, a single Antispoofing log entry or Discard log entry is logged. The single entry contains information on the total number of the generated Antispoofing log entries or Discard log entries. After this, logging returns to normal and all the generated entries are once more logged and displayed separately. Example of using log moderation settings: >>> engine = Engine("testme") >>> log_moderation_obj=engine.log_moderation >>> log_moderation_obj.get(1)["rate"] 100 >>> log_moderation_obj.get(1)["burst"] 1000 >>> log_moderation_obj.add(rate=200,burst=1100,log_event=2) >>> engine.update(log_spooling_policy='discard') >>> log_moderation_obj=engine.log_moderation >>> log_moderation_obj.get(2)["rate"] 200 >>> log_moderation_obj.get(2)["burst"] 1100 :rtype LogModeration """ return LogModeration(self) @property def antivirus(self): """ AntiVirus engine settings. Note that for virtual engines the AV settings are configured on the Master Engine. Get current status:: engine.antivirus.status :raises UnsupportedEngineFeature: Invalid engine type for AV :rtype: AntiVirus """ if not self.type.startswith("virtual"): return AntiVirus(self) raise UnsupportedEngineFeature( "Antivirus is not supported directly on this engine type. If this " "is a virtual engine, AV is configured on the master engine." ) @property def file_reputation(self): """ File reputation status on engine. Note that for virtual engines the AV settings are configured on the Master Engine. Get current status:: engine.file_reputation.status :raises UnsupportedEngineFeature: Invalid engine type for file rep :rtype: FileReputation """ if not self.type.startswith("virtual"): return FileReputation(self) raise UnsupportedEngineFeature( "GTI should be enabled on the Master Engine not directly on the " "virtual engine." ) @property def sidewinder_proxy(self): """ Configure Sidewinder Proxy settings on this engine. Sidewinder proxy is supported on layer 3 engines and require SMC and engine version >= 6.1. Get current status:: engine.sidewinder_proxy.status :raises UnsupportedEngineFeature: requires layer 3 engine :rtype: SidewinderProxy """ if "sidewinder_proxy_enabled" in self.data: return SidewinderProxy(self) raise UnsupportedEngineFeature( "Sidewinder Proxy requires a layer 3 engine and version >= v6.1." ) @property def known_host_lists(self): """ Configure SSH known host lists on the engine. Can only be set if Sidewinder Proxy is enabled. :raises MissingRequiredInput: requires sidewinder proxy to be enabled :rtype: KnownHostLists """ if self.data["sidewinder_proxy_enabled"] and "known_host_lists_ref" in self.data: return SSHKnownHostsLists(self) raise MissingRequiredInput( "SSH Known Host Lists require Sidewinder Proxy to be enabled." ) @property def url_filtering(self): """ Configure URL Filtering settings on the engine. Get current status:: engine.url_filtering.status :raises UnsupportedEngineFeature: not supported on virtual engines :rtype: UrlFiltering """ if not self.type.startswith("virtual"): return UrlFiltering(self) raise UnsupportedEngineFeature( "Enabling URL Filtering should be done on the Master Engine, not " "directly on the virtual engine." ) @property def ztna_connector(self): if not min_smc_version("7.0"): raise UnsupportedEngineFeature("Need at least 7.0 version of the SMC") if self.type not in ("single_fw", "fw_cluster", "virtual_fw"): raise UnsupportedEngineFeature( "Enabling ZTNA Connector should be done only on single_fw, " "fw_cluster or virtual_fw") return ZTNAConnector(self) @property def sandbox(self): """ Configure sandbox settings on the engine. Get current status:: engine.sandbox.status :raises UnsupportedEngineFeature: not supported on virtual engine :rtype: Sandbox """ if not self.type.startswith("virtual"): return Sandbox(self) raise UnsupportedEngineFeature( "Enabling sandbox should be done on the Master Engine, not " "directly on the virtual engine." ) @property def endpoint_integration(self): """ Endpoint Integration status on engine. Note that for master engines the endpoint integrations settings are configured on Virtual Engines. Get current status:: engine.endpoint_integration.status :raises UnsupportedEngineFeature: Invalid engine type for file rep :rtype: EndpointIntegration """ if not self.type.startswith("master"): return EndpointIntegration(self) raise UnsupportedEngineFeature( "Endpoint Integration cannot be configured on master engine" ) @property def dns_relay(self): """ Enable, disable or get status for the DNS Relay Service on this engine. You must still separately configure the :class:`smc.elements.profiles.DNSRelayProfile` that the engine references. :raises UnsupportedEngineFeature: unsupported feature on this engine type. :rtype: DNSRelay """ if "dns_relay_interface" in self.data: return DNSRelay(self) raise UnsupportedEngineFeature("DNS Relay requires a layer 3 engine and version >= v6.2.") @property def tls_inspection(self): """ TLS Inspection settings manage certificates assigned to the engine for TLS server decryption (inbound) and TLS client decryption (outbound). In order to enable either, you must first assign certificates to the engine. Example of adding TLSServerCredentials to an engine:: >>> engine = Engine('myfirewall') >>> tls = TLSServerCredential('server2.test.local') >>> engine.tls_inspection.add_tls_credential([tls]) >>> engine.tls_inspection.server_credentials [TLSServerCredential(name=server2.test.local)] :rtype: TLSInspection """ return TLSInspection(self) @property def client_inspection(self): """ Client TLS Inspection settings manage certificates assigned to the engine for TLS client decryption (outbound). In order to enable either, you must first assign certificates to the engine. Example of adding ClientInspection to an engine:: >>> engine = Engine('myfirewall') >>> tls = ClientInspection('client.test.local') >>> engine.client_inspection.enable(tls) >>> engine.update() :rtype: ClientInspection """ return ClientInspection(self) @property def ospf(self): return self.dynamic_routing.ospf @property def bgp(self): return self.dynamic_routing.bgp @property def dynamic_routing(self): """ Dynamic Routing entry point. Access BGP, OSPF configurations :raises UnsupportedEngineFeature: Only supported on layer 3 engines :rtype: DynamicRouting """ if "dynamic_routing" in self.data: return DynamicRouting(self) raise UnsupportedEngineFeature("Dynamic routing is only supported on layer 3 engine types") @property def l2fw_settings(self): """ Layer 2 Firewall Settings make it possible for a layer 3 firewall to run specified interfaces in layer 2 mode. This requires that a layer 2 interface policy is assigned to the engine and that inline_l2fw interfaces are created. :raises UnsupportedEngineFeature: requires layer 3 engine :rtype: Layer2Settings """ if "l2fw_settings" in self.data: return Layer2Settings(self) raise UnsupportedEngineFeature( "Layer2FW settings are only supported on layer 3 engines using " "engine and SMC version >= 6.3" ) @property def nodes(self): """ Return a list of child nodes of this engine. This can be used to iterate to obtain access to node level operations :: >>> print(list(engine.nodes)) [Node(name=myfirewall node 1)] >>> engine.nodes.get(0) Node(name=myfirewall node 1) :return: nodes for this engine :rtype: SubElementCollection(Node) """ resource = sub_collection(self.get_relation("nodes"), Node) resource._load_from_engine(self, "nodes") return resource @property def granted_permissions(self): """ Retrieve the access control list permissions for this engine instance. :: >>> from smc.core.engine import Engine >>> engine = Engine('myfirewall') >>> for x in engine.permissions.granted_permissions: ... print(x) ... AccessControlList(name=ALL Elements) AccessControlList(name=ALL Firewalls) :raises UnsupportedEngineFeature: requires SMC version >= 6.1 :return: access control list permissions :rtype: list(AccessControlList) """ acl_list = list(AccessControlList.objects.all()) def acl_map(elem_href): for elem in acl_list: if elem.href == elem_href: return elem for acl in self.permissions.granted_access_control_list: yield (acl_map(acl)) @property def permissions(self): """ Retrieve the permissions for this engine instance. :rtype: GrantedElementPermissions .. note:: This method has changed in fp-NGFW-SMC-python >= 1.0.24 so need to make your script compatible with this change." .. seealso:: :class:`smc.administration.access_rights.GrantedElementPermissions`. """ data = self.make_request(UnsupportedEngineFeature, resource="permissions") return GrantedElementPermissions(data) @permissions.setter def permissions(self, permissions): """ Update the permissions for this engine instance. Example to update permission: >>> algiers = Engine('Algiers') >>> all_elements_acl = AccessControlList('ALL Elements').href >>> all_fws_acl = AccessControlList('ALL Engines').href >>> permissions_object = algiers.permissions >>> permissions_object.update(granted_access_control_list=[all_elements_acl, >>> all_fws_acl]) >>> algiers.permissions = permissions_object Example to create permission: >>> admin_user=list(AdminUser.objects.all())[0] >>> admin_domain=list(AdminDomain.objects.all())[0] >>> roles=list(Role.objects.all()) >>> cluster_ref = algiers.href >>> all_elements_acl = AccessControlList('ALL Elements') >>> all_fws_acl = AccessControlList('ALL Engines') >>> role_container = [AdminRoleContainer.create(roles=roles, >>> granted_domain=admin_domain,admin=admin_user)] >>> permissions_object = GrantedElementPermissions.create(cluster_ref=cluster_ref, >>> granted_access_control_list=[all_elements_acl, all_fws_acl], >>> role_containers=role_container) >>> algiers.permissions = permissions_object .. note:: This method has changed in fp-NGFW-SMC-python >= 1.0.24, so need to make your script compatible with this change." .. seealso:: :py:class:`smc.administration.access_rights.GrantedElementPermissions` and :py:class:`smc.administration.access_rights.AdminRoleContainer` :param permissions: permissions object """ if is_smc_version_equal("7.0"): raise UpdateElementFailed("Update permission is not supported in smc 7.0.") etag = self.make_request(UnsupportedEngineFeature, resource="permissions", raw_result=True).etag.strip('"') self.make_request(UnsupportedEngineFeature, resource="permissions", method="update", json=permissions.data, etag=etag) @property def pending_changes(self): """ Pending changes provides insight into changes on an engine that are pending approval or disapproval. Feature requires SMC >= v6.2. :raises UnsupportedEngineFeature: SMC version >= 6.2 is required to support pending changes :rtype: PendingChanges """ if "pending_changes" in self.data.links: return PendingChanges(self) raise UnsupportedEngineFeature( "Pending changes is an unsupported feature on this engine: {}".format(self.type) ) @property def lbfilters(self): """ Load balancing filter list :raises UnsupportedEngineFeature: Invalid engine type :rtype: list LBFilter """ if self.type.endswith("cluster"): return [LBFilter(**lbfilter) for lbfilter in self.data.data["lbfilter"]] raise UnsupportedEngineFeature( "Available only for cluster engine" ) @lbfilters.setter def lbfilters(self, lbfilter): """ Update the lbfilter list for this engine instance. Load balancing filter list :param list LBFilter lbfilter: Load balancing filter list :raises UnsupportedEngineFeature: Invalid engine type """ if self.type.endswith("cluster"): self.data.data["lbfilter"] = [] for filter in lbfilter: self.data.data["lbfilter"].append(filter.data) else: raise UnsupportedEngineFeature( "Available only for cluster engine" ) @property def lbfilter_useports(self): """ Load Balancing Filter use ports :raises UnsupportedEngineFeature: Invalid engine type :rtype: bool """ if self.type.endswith("cluster"): return self.data.data["lbfilter_useports"] raise UnsupportedEngineFeature( "Available only for cluster engine" ) @lbfilter_useports.setter def lbfilter_useports(self, lbfilter_useports): """ Update the lbfilter_useports value for this engine instance. :param bool useports: the use ports value """ if self.type.endswith("cluster"): self.data.data["lbfilter_useports"] = lbfilter_useports else: raise UnsupportedEngineFeature( "Available only for cluster engine" )
[docs] def alias_resolving(self): """ Alias definitions with resolved values as defined on this engine. Aliases can be used in rules to simplify multiple object creation :: fw = Engine('myfirewall') for alias in fw.alias_resolving(): print(alias, alias.resolved_value) ... (Alias(name=$$ Interface ID 0.ip), [u'10.10.0.1']) (Alias(name=$$ Interface ID 0.net), [u'10.10.0.0/24']) (Alias(name=$$ Interface ID 1.ip), [u'10.10.10.1']) :return: generator of aliases :rtype: Alias """ alias_list = list(Alias.objects.all()) for alias in self.make_request(resource="alias_resolving"): yield Alias._from_engine(alias, alias_list)
[docs] def blacklist(self, src, dst, duration=3600, **kw): """ Add blacklist entry to engine node by name. For blacklist to work, you must also create a rule with action "Apply Blacklist". :param src: source address, with cidr, i.e. 10.10.10.10/32 or 'any' :param dst: destination address with cidr, i.e. 1.1.1.1/32 or 'any' :param int duration: how long to blacklist in seconds :raises EngineCommandFailed: blacklist failed during apply :return: None .. note:: This method requires SMC version >= 6.4 and SMC version <7.0 since this version, "blacklist" is renamed "block_list" """ json_bl_entry = prepare_blacklist(src, dst, duration, **kw) if not is_api_version_less_than_or_equal("6.3"): json_bl_entry = {"entries": [json_bl_entry]} if is_api_version_less_than("7.0"): resource = "blacklist" else: resource = "block_list" self.make_request( EngineCommandFailed, method="create", resource=resource, json=json_bl_entry, )
[docs] def block_list(self, src, dst, duration=3600, **kw): """ Add block_list entry to engine node by name. For block_list to work, you must also create a rule with action "Apply Blocklist". :param src: source address, with cidr, i.e. 10.10.10.10/32 or 'any' :param dst: destination address with cidr, i.e. 1.1.1.1/32 or 'any' :param int duration: how long to block list in seconds :raises EngineCommandFailed: block list failed during apply :return: None .. note:: This method requires SMC version >= 7.0 """ json_bl_entry = prepare_block_list(src, dst, duration, **kw) if not is_api_version_less_than_or_equal("6.3"): json_bl_entry = {"entries": [json_bl_entry]} self.make_request( EngineCommandFailed, method="create", resource="block_list", json=json_bl_entry, )
[docs] def block_list_bulk(self, block_list): """ Add block_list entries to the engine node in bulk. For block_list to work, you must also create a rule with action "Apply Blocklist". First create your block_list entries using :class:`smc.elements.other.Blocklist` then provide the block_list to this method. :param Blocklist block_list : pre-configured block_list entries .. note:: This method requires SMC version >= 7.0 """ self.make_request( EngineCommandFailed, method="create", resource="block_list", json=block_list.entries )
[docs] def blacklist_bulk(self, block_list): """ Add block list entries to the engine node in bulk. For block list to work, you must also create a rule with action "Apply Block List". First create your block_list entries using :class:`smc.elements.other.Blacklist` then provide the block list to this method. :param Blacklist block_list : pre-configured block list entries .. note:: This method requires SMC version >= 6.4 and SMC version <7.0 since this version, "blacklist" is renamed "block_list" """ if is_api_version_less_than("7.0"): resource = "blacklist" else: resource = "block_list" self.make_request(EngineCommandFailed, method="create", resource=resource, json=block_list.entries)
[docs] def block_list_flush(self): """ Flush entire block list for engine :raises EngineCommandFailed: flushing block list failed with reason :return: None .. note:: This method requires SMC version >= 7.0 """ self.make_request(EngineCommandFailed, method="delete", resource="flush_block_list")
[docs] def blacklist_flush(self): """ Flush entire blacklist for engine :raises EngineCommandFailed: flushing blacklist failed with reason :return: None .. note:: This method requires SMC version < 7.0 since this version, "blacklist" is renamed "block_list" """ if is_api_version_less_than("7.0"): self.make_request(EngineCommandFailed, method="delete", resource="flush_blacklist") else: self.make_request(EngineCommandFailed, method="delete", resource="flush_block_list")
[docs] def block_list_show(self, **kw): """ .. versionadded:: 0.5.6 Requires pip install smc-python-monitoring Block list show requires that you install the smc-python-monitoring package. To obtain Blocklist entries from the engine you need to use this extension to plumb the websocket to the session. If you need more granular controls over the block_list such as filtering by source and destination address, use the smc-python-monitoring package directly. Blocklist entries that are returned from this generator have a delete() method that can be called to simplify removing entries. A simple query would look like:: for bl_entry in engine.block_list_show(): print(bl_entry) :param kw: keyword arguments passed to block list query. Common setting is to pass max_recv=20, which specifies how many "receive" batches will be retrieved from the SMC for the query. At most, 200 results can be returned in a single query. If max_recv=5, then 1000 results can be returned if they exist. If less than 1000 events are available, the call will be blocking until 5 receives has been reached. :return: generator of results :rtype: :class:`smc_monitoring.monitors.blocklist.BlocklistEntry` """ try: from smc_monitoring.monitors.blocklist import BlocklistQuery except ImportError: pass else: query = BlocklistQuery(self.name) for record in query.fetch_as_element(**kw): yield record
[docs] def blacklist_show(self, **kw): """ .. versionadded:: 0.5.6 Requires pip install smc-python-monitoring Blacklist show requires that you install the smc-python-monitoring package. To obtain Blacklist entries from the engine you need to use this extension to plumb the websocket to the session. If you need more granular controls over the blacklist such as filtering by source and destination address, use the smc-python-monitoring package directly. Blacklist entries that are returned from this generator have a delete() method that can be called to simplify removing entries. A simple query would look like:: for bl_entry in engine.blacklist_show(): print(bl_entry) :param kw: keyword arguments passed to blacklist query. Common setting is to pass max_recv=20, which specifies how many "receive" batches will be retrieved from the SMC for the query. At most, 200 results can be returned in a single query. If max_recv=5, then 1000 results can be returned if they exist. If less than 1000 events are available, the call will be blocking until 5 receives has been reached. :return: generator of results :rtype: :class:`smc_monitoring.monitors.blacklist.BlacklistEntry` .. note:: This method requires SMC version < 7.0 since this version, "blacklist" is renamed "block_list" """ try: from smc_monitoring.monitors.blacklist import BlacklistQuery except ImportError: pass else: if is_api_version_less_than("7.0"): query = BlacklistQuery(self.name) else: query = BlocklistQuery(self.name) for record in query.fetch_as_element(**kw): yield record
[docs] def remove_alternative_policies(self): """ Remove all alternative policies on engine. """ self.update(href=self.get_relation("remove_alternative_policies"), etag=None)
@property def link_usage_exception_rules(self): """ A collection of link_usage_exception_rules :rtype: list(LinkUsageExceptionRules) """ return [LinkUsageExceptionRules(**nc) for nc in self.data.get("link_usage_exception_rules", [])]
[docs] def query_route(self, source_ref=None, destination_ref=None, source_ip=None, destination_ip=None): """ Allows querying a route for the specific supported engine Options: A. Using Query Parameters: source_ip: the IP Address A.B.C.D corresponding to the source query ip address. destination_ip: the IP Address A.B.C.D corresponding to the destination query ip address B. Using payload to be able to specify source network element uri and/or destination network element uri. Find route for source to destination using ip address >>> engine = Engine('Plano') >>> engine.query_route(source_ip='0.0.0.0', destination_ip= '0.0.0.0') [Routing(name=Interface 1,level=None,type=routing), Routing(name=net-172.31.14.0/24, level=None,type=routing), Routing(name=AT&T Plano Router,level=None,type=routing), Routing(name=Any network,level=None,type=routing)] Find the route using query route with ref >>> list_of_routing = list(Host.objects.all()) >>> host1 = list_of_routing[0] >>> host2 = list_of_routing[1] >>> engine.query_route(source_ref=host1.href, destination_ref=host2.href) :param str source_ref: specify source network element uri :param str destination_ref: destination network element uri :param str source_ip: source ip address :param str destination_ip: destination ip address :return list(Routing): the result pages containing the result routing. """ json = {} if source_ref: json.update(source_ref=source_ref) if destination_ref: json.update(destination_ref=destination_ref) if source_ip: json.update(source_ip=source_ip) if destination_ip: json.update(destination_ip=destination_ip) result = self.make_request(EngineCommandFailed, method="read", resource="query_route", json=json) return [Routing(routing) for entry in result for routing in entry['entry']]
[docs] def add_route(self, gateway=None, network=None, payload=None): """ Add a route to engine. Specify gateway and network. If this is the default gateway, use a network address of 0.0.0.0/0. .. note: This will fail if the gateway provided does not have a corresponding interface on the network. :param str gateway: gateway of an existing interface :param str network: network address in cidr format :param href payload: the payload to add route with href of element Example: {"gateway_ip": X.Y.Z.Z, "network_ip": A.B.C.D} OR {"gateway_ref": href, "network_ref": href} :raises EngineCommandFailed: invalid route, possibly no network :return: None """ if payload: self.make_request( EngineCommandFailed, method="create", resource="add_route", json=payload ) else: # Doing simple add route self.make_request( EngineCommandFailed, method="create", resource="add_route", params={"gateway": gateway, "network": network}, payload={}, )
@property def policy_route(self): """ Configure policy based routes on the engine. :: engine.policy_route.create( source='172.18.2.0/24', destination='192.168.3.0/24', gateway_ip='172.18.2.1') :rtype: PolicyRoute """ if "policy_route" in self.data: return PolicyRoute(self) raise UnsupportedEngineFeature("Policy routing is only supported on layer 3 engine types") @property def routing(self): """ Find all routing nodes within engine:: for routing in engine.routing.all(): for routes in routing: ... Or just retrieve a routing configuration for a single interface:: interface = engine.routing.get(0) :return: top level routing node :rtype: Routing """ return Routing(href=self.get_relation("routing")) @property def routing_monitoring(self): """ Return route table for the engine, including gateway, networks and type of route (dynamic, static). Calling this can take a few seconds to retrieve routes from the engine. Find all routes for engine resource:: >>> engine = Engine('sg_vm') >>> for route in engine.routing_monitoring: ... route ... Route(route_network=u'0.0.0.0', route_netmask=0, route_gateway=u'10.0.0.1', route_type=u'Static', dst_if=1, src_if=-1) ... :raises EngineCommandFailed: routes cannot be retrieved :return: list of route elements :rtype: SerializedIterable(Route) """ try: result = self.make_request(EngineCommandFailed, resource="routing_monitoring") return Route(result) except SMCConnectionError: raise EngineCommandFailed("Timed out waiting for routes")
[docs] def get_session_monitoring(self, sesmon_type, full=True): """ Available for all SMC API Versions but only for SMC Version above 7.1 (7.1 included) Return session monitoring for the requested session monitoring type for the engine Find all routes for engine resource:: :param sesmon_type requested session monitoring type. Possible value are defined in session_monitoring.EngineSessionMonitoringType :optional param full ( default value is true ). When set to false, juste retrieve the log key of each entry ( timestamp, component id, event id ). :raises EngineCommandFailed : session monitoring result cannot be retrieved :return: list of session monitoring entries : session_monitoring.SessionMonitoringResult :rtype: SerializedIterable(Route) Example: from smc.core.session_monitoring import EngineSessionMonitoringType engine.get_session_monitoring(EngineSessionMonitoringType.CONNECTION) """ if is_smc_version_less_than_or_equal("7.0"): raise UnsupportedEngineFeature("Need at least 7.1 version of the SMC") try: params = None if not full: params = dict() params["full"] = False result = self.make_request(EngineCommandFailed, resource=sesmon_type, params=params) if result: return SessionMonitoringResult(sesmon_type, result) return None except SMCConnectionError: raise EngineCommandFailed(f"Timed out waiting for {sesmon_type}")
@property def antispoofing(self): """ Antispoofing interface information. By default is based on routing but can be modified. :: for entry in engine.antispoofing.all(): print(entry) :return: top level antispoofing node :rtype: Antispoofing """ return Antispoofing(href=self.get_relation("antispoofing")) @property def internal_gateway(self): """ Engine level VPN gateway information. This is a link from the engine to VPN level settings like VPN Client, Enabling/disabling an interface, adding VPN sites, etc. Example of adding a new VPN site to the engine's site list with associated networks:: >>> network = Network.get_or_create(name='mynetwork', ipv4_network='1.1.1.0/24') Network(name=mynetwork) >>> engine.internal_gateway.vpn_site.create(name='mynewsite', site_element=[network]) VPNSite(name=mynewsite) :raises UnsupportedEngineFeature: internal gateway is only supported on layer 3 engine types. :return: this engines internal gateway :rtype: InternalGateway """ return self.vpn @property def all_vpns(self): """ Engine level all VPN gateway information. Example: >>> list_of_all_internal_gateways=engine.all_vpns >>> first_vpn_instance= list_of_all_internal_gateways[0] >>> first_vpn_instance.name :raises UnsupportedEngineFeature: internal gateway is only supported on layer 3 engine types. :return: list of engine internal gateways :rtype: List of All VPN Gateway Configuration """ result = self.make_request(UnsupportedEngineFeature, resource="internal_gateway") return [VPN(self, InternalGateway(**gateway)) for gateway in result]
[docs] def create_internal_gateway(self, name, antivirus=None, auto_certificate=None, auto_site_content=None, dhcp_relay=None, end_point=None, firewall=None, gateway_profile=None, ssl_vpn_portal_setting=None, ssl_vpn_proxy=None, ssl_vpn_tunneling=None, trust_all_cas=None, trusted_certificate_authorities=None, vpn_client_mode=None, **kwargs): """ Create internal gateway Example of creating internal gateway: >>> engine.create_internal_gateway("test") :param str name: Name of the internal gateway :param str antivirus: Antivirus :param str auto_certificate: Automated RSA Certificate Management :param str auto_site_content: Indicates whether the site content is automatically generated from the routing view. :param str dhcp_relay: DHCP Relay. :param str end_point: List of end-points. :param str firewall: Firewall :param str gateway_profile: Gateway Profile :param str ssl_vpn_portal_setting: SSL VPN Settings for the Portal. :param str ssl_vpn_proxy: vpn proxy :param str ssl_vpn_tunneling: SSL VPN Settings for the VPN Client. :param str trust_all_cas: Indicates if the EndPoint trust all VPN Certificate Authorities. :param str trusted_certificate_authorities: List of trusted VPN Certificate Authorities. Valid only if the EndPoint does not trust all VPN CAs. :param str vpn_client_mode: VPN Client Mode accepted values given below: *no *ipsec *ssl *both :raises UnsupportedEngineFeature: internal gateway is only supported on layer 3 engine types :return: None :rtype: None """ json = {"name": name} if antivirus: json.update(antivirus=antivirus) if auto_certificate: json.update(auto_certificate=auto_certificate) if auto_site_content: json.update(auto_site_content=auto_site_content) if dhcp_relay: json.update(dhcp_relay=dhcp_relay) if end_point: json.update(end_point=end_point) if firewall: json.update(firewall=firewall) if gateway_profile: json.update(gateway_profile=gateway_profile) if ssl_vpn_portal_setting: json.update(ssl_vpn_portal_setting=ssl_vpn_portal_setting) if ssl_vpn_proxy: json.update(ssl_vpn_proxy=ssl_vpn_proxy) if ssl_vpn_tunneling: json.update(ssl_vpn_tunneling=ssl_vpn_tunneling) if trust_all_cas: json.update(trust_all_cas=trust_all_cas) if trusted_certificate_authorities: json.update( trusted_certificate_authorities=trusted_certificate_authorities) if vpn_client_mode: json.update(vpn_client_mode=vpn_client_mode) if kwargs: json.update(**kwargs) self.make_request( UnsupportedEngineFeature, method="create", resource="internal_gateway", json=json, )
@cacheable_resource def vpn(self): """ VPN configuration for the engine. :raises: UnsupportedEngineFeature: VPN is only supported on layer 3 engines. :rtype: VPN """ return VPN(self) @property def vpn_endpoint(self): """ A VPN endpoint is an address assigned to a layer 3 interface that can be enabled to turn on VPN capabilities. As an interface may have multiple IP addresses assigned, the endpoints are returned based on the address. Endpoints are properties of the engines Internal Gateway. :raises UnsupportedEngineFeature: only supported on layer 3 engines :rtype: SubElementCollection(InternalEndpoint) """ return self.vpn.internal_endpoint @property def vpn_mappings(self): """ .. versionadded:: 0.6.0 Requires SMC version >= 6.3.4 VPN policy mappings (by name) for this engine. This is a shortcut method to determine which VPN policies are used by the firewall. :raises UnsupportedEngineFeature: requires a layer 3 firewall and SMC version >= 6.3.4. :rtype: VPNMappingCollection(VPNMapping) """ return VPNMappingCollection( self.make_request(UnsupportedEngineFeature, resource="vpn_mapping") ) @property def virtual_resource(self): """ Available on a Master Engine only. To get all virtual resources call:: engine.virtual_resource.all() :raises UnsupportedEngineFeature: master engine only :rtype: CreateCollection(VirtualResource) """ resource = create_collection( self.get_relation("virtual_resources", UnsupportedEngineFeature), VirtualResource ) resource._load_from_engine(self, "virtualResources") return resource @property def contact_addresses(self): """ Contact addresses are NAT addresses that are assigned to interfaces. These are used when a component needs to communicate with another component through a NAT'd connection. For example, if a firewall is known by a pubic address but the interface uses a private address, you would assign the public address as a contact address for that interface. .. note:: Contact addresses are only supported with SMC >= 6.2. Obtain all eligible interfaces for contact addressess:: >>> engine = Engine('dingo') >>> for ca in engine.contact_addresses: ... ca ... ContactAddressNode(interface_id=11, interface_ip=10.10.10.20) ContactAddressNode(interface_id=120, interface_ip=120.120.120.100) ContactAddressNode(interface_id=0, interface_ip=1.1.1.1) ContactAddressNode(interface_id=12, interface_ip=3.3.3.3) ContactAddressNode(interface_id=12, interface_ip=17.17.17.17) .. seealso:: :py:mod:`smc.core.contact_address` This is set to a private method because the logic doesn't make sense with respects to how this is configured under the SMC. :rtype: ContactAddressCollection(ContactAddressNode) """ return ContactAddressCollection(self.get_relation("contact_addresses")) @property def interface_options(self): """ Interface options specify settings related to setting primary/ backup management, outgoing, and primary/backup heartbeat interfaces. For example, set primary management interface (this unsets it from the currently assigned interface):: engine.interface_options.set_primary_mgt(10) Obtain the primary management interface:: print(engine.interface_options.primary_mgt) :rtype: InterfaceOptions """ return InterfaceOptions(self) @property def interface(self): """ Get all interfaces, including non-physical interfaces such as tunnel or capture interfaces. These are returned as Interface objects and can be used to load specific interfaces to modify, etc. :: for interfaces in engine.interface: ...... :rtype: InterfaceCollection See :class:`smc.core.interfaces.Interface` for more info """ return InterfaceCollection(self) @property def physical_interface(self): """ Returns a PhysicalInterface. This property can be used to add physical interfaces to the engine. For example:: engine.physical_interface.add_inline_interface(....) engine.physical_interface.add_layer3_interface(....) :raises UnsupportedInterfaceType: engine doesn't support this type :rtype: PhysicalInterfaceCollection """ return PhysicalInterfaceCollection(self) @property def virtual_physical_interface(self): """Master Engine virtual instance only A virtual physical interface is for a master engine virtual instance. This interface type is just a subset of a normal physical interface but for virtual engines. This interface only sets Auth_Request and Outgoing on the interface. To view all interfaces for a virtual engine:: for intf in engine.virtual_physical_interface: print(intf) :raises UnsupportedInterfaceType: supported on virtual engines only :rtype: VirtualPhysicalInterfaceCollection """ return VirtualPhysicalInterfaceCollection(self) @property def tunnel_interface(self): """ Get only tunnel interfaces for this engine node. :raises UnsupportedInterfaceType: supported on layer 3 engine only :rtype: TunnelInterfaceCollection """ return TunnelInterfaceCollection(self) @property def vpn_broker_interface(self): """ Get only vpn broker interfaces for this engine node. :raises UnsupportedInterfaceType: supported on layer 3 engine only :rtype: VPNBrokerInterfaceCollection """ return VPNBrokerInterfaceCollection(self) @property def loopback_interface(self): """ Retrieve any loopback interfaces for this engine. Loopback interfaces are only supported on layer 3 firewall types. Retrieve all loopback addresses:: for loopback in engine.loopback_interface: print(loopback) :raises UnsupportedInterfaceType: supported on layer 3 engine only :rtype: LoopbackCollection """ if self.type in ("single_fw", "fw_cluster", "virtual_fw"): return LoopbackCollection(self) raise UnsupportedInterfaceType( "Loopback addresses are only supported on layer 3 firewall types" ) @property def modem_interface(self): """ Get only modem interfaces for this engine node. :raises: UnsupportedInterfaceType: modem interfaces are only supported on layer 3 engines :return: list of dict entries with href,name,type, or None """ return self.make_request(UnsupportedInterfaceType, resource="modem_interface") @property def adsl_interface(self): """ Get only adsl interfaces for this engine node. :raises UnsupportedInterfaceType: adsl interfaces are only supported on layer 3 engines :return: list of dict entries with href,name,type, or None """ return self.make_request(UnsupportedInterfaceType, resource="adsl_interface") @property def wireless_interface(self): """ Get only wireless interfaces for this engine node. :raises UnsupportedInterfaceType: wireless interfaces are only supported on layer 3 engines :return: list of dict entries with href,name,type, or None """ return self.make_request(UnsupportedInterfaceType, resource="wireless_interface") @property def switch_physical_interface(self): """ Get only switch physical interfaces for this engine node. This is an iterable property:: for interface in engine.switch_physical_interface: ... Or you can fetch a switch port interface/module directly by using the generic interface property:: engine.interface.get('SWP_0') Or through this property directly:: engine.switch_physical_interface.get('SWP_0') :raises UnsupportedInterfaceType: switch interfaces are only supported on specific firewall models :return: list of dict entries with href,name,type, or None """ return SwitchInterfaceCollection(self) @property def lldp_profile(self): """ It represents a set of attributes used for configuring LLDP(Link Layer Discovery Protocol). LLDP information is advertised by devices at a fixed interval in the form of LLDP data units represented by TLV structures. :param value: LLDP Profile to assign engine. Can be str href, or LLDPProfile element. :raises UpdateElementFailed: failure to update element :return: LLDPProfile element or None """ return Element.from_href(self.lldp_profile_ref) @lldp_profile.setter def lldp_profile(self, value): if min_smc_version("6.6"): if isinstance(value, LLDPProfile): self.data.update(lldp_profile=value.href) else: self.data.update(lldp_profile=value) @property def link_usage_profile(self): """ Represent link usage profile :param value: Link usage profile to assign engine. Can be str href, or LinkUsageProfile element. :raises UpdateElementFailed: failure to update element :return: LinkUsageProfile element or None """ return Element.from_href(self.link_usage_profile_ref) @link_usage_profile.setter def link_usage_profile(self, value): if isinstance(value, LinkUsageProfile): self.data.update(link_usage_profile_ref=value.href) else: self.data.update(link_usage_profile_ref=value) @property def discard_quic_if_cant_inspect(self): """ Discard or allow QUIC if inspection is impossible :rtype: bool """ if not self.type.startswith("master"): return self.data.get("discard_quic_if_cant_inspect") raise UnsupportedEngineFeature( "This engine type does not support discard_quic_if_cant_inspect.") @discard_quic_if_cant_inspect.setter def discard_quic_if_cant_inspect(self, value): if min_smc_version("7.0"): self.data["discard_quic_if_cant_inspect"] = value
[docs] def add_interface(self, interface, **kw): """ Add interface is a lower level option to adding interfaces directly to the engine. The interface is expected to be an instance of Layer3PhysicalInterface, Layer2PhysicalInterface, TunnelInterface, or ClusterInterface. The engines instance cache is flushed after this call is made to provide an updated cache after modification. .. seealso:: :class:`smc.core.engine.interface.update_or_create` :param PhysicalInterface,TunnelInterface interface: instance of pre-created interface :return: None """ params = None if "params" in kw: params = kw.pop("params") self.make_request( EngineCommandFailed, method="create", href=self.get_relation(interface.typeof), json=interface, params=params, ) self._del_cache()
[docs] def refresh( self, timeout=3, wait_for_finish=False, preserve_connections=True, generate_snapshot=True, **kw ): """ Refresh existing policy on specified device. This is an asynchronous call that will return a 'follower' link that can be queried to determine the status of the task. :: poller = engine.refresh(wait_for_finish=True) while not poller.done(): poller.wait(5) print('Percentage complete {}%'.format(poller.task.progress)) :param int timeout: timeout between queries :param bool wait_for_finish: poll the task waiting for status :param bool preserve_connections: flag to preserve connections (True by default) :param bool generate_snapshot: flag to generate snapshot (True by default) :raises TaskRunFailed: refresh failed, possibly locked policy :rtype: TaskOperationPoller """ kw.update({"params": {"generate_snapshot": generate_snapshot, "preserve_connections": preserve_connections}}) return Task.execute(self, "refresh", timeout=timeout, wait_for_finish=wait_for_finish, **kw)
[docs] def upload( self, policy=None, timeout=5, wait_for_finish=False, preserve_connections=True, generate_snapshot=True, **kw ): """ Upload policy to engine. This is used when a new policy is required for an engine, or this is the first time a policy is pushed to an engine. If an engine already has a policy and the intent is to re-push, then use :py:func:`refresh` instead. The policy argument can use a wildcard * to specify in the event a full name is not known:: engine = Engine('myfw') task = engine.upload('Amazon*', wait_for_finish=True) for message in task.wait(): print(message) :param str policy: name of policy to upload to engine; if None, current policy :param bool wait_for_finish: poll the task waiting for status :param int timeout: timeout between queries :param bool preserve_connections: flag to preserve connections (True by default) :param bool generate_snapshot: flag to generate snapshot (True by default) :raises TaskRunFailed: upload failed with reason :rtype: TaskOperationPoller """ return Task.execute( self, "upload", params={ "filter": policy, "preserve_connections": preserve_connections, "generate_snapshot": generate_snapshot, }, timeout=timeout, wait_for_finish=wait_for_finish, **kw )
[docs] def upload_alternative_slot( self, alternative_slot=None, policy=None, timeout=5, wait_for_finish=False, generate_snapshot=True, **kw ): """ Upload policy to engine alternative slot. This is used when multiple policies are required for an engine. If an engine already has a policy and the intent is to re-push, then use :py:func:`refresh` instead. The policy argument can use a wildcard * to specify in the event a full name is not known:: engine = Engine('myfw') task = engine.upload_alternative_slot(1, 'Amazon*', wait_for_finish=True) for message in task.wait(): print(message) :param int alternative_slot: Slot of policy to upload to engine(1 to 3) :param str policy: name of policy to upload to engine; if None, current policy :param bool wait_for_finish: poll the task waiting for status :param int timeout: timeout between queries :param bool generate_snapshot: flag to generate snapshot (True by default) :raises TaskRunFailed: upload failed with reason :rtype: TaskOperationPoller """ return Task.execute( self, "upload", params={ "alternative_slot": alternative_slot, "filter": policy, "generate_snapshot": generate_snapshot, }, timeout=timeout, wait_for_finish=wait_for_finish, **kw )
[docs] def generate_snapshot(self, filename="snapshot.zip"): """ Generate and retrieve a policy snapshot from the engine This is blocking as file is downloaded :param str filename: name of file to save file to, including directory path :raises EngineCommandFailed: snapshot failed, possibly invalid filename specified :return: None """ try: self.make_request(EngineCommandFailed, resource="generate_snapshot", filename=filename) except IOError as e: raise EngineCommandFailed("Generate snapshot failed: {}".format(e))
@property def snapshots(self): """ References to policy based snapshots for this engine, including the date the snapshot was made :raises EngineCommandFailed: failure downloading, or IOError :rtype: SubElementCollection(Snapshot) """ return sub_collection(self.get_relation("snapshots", EngineCommandFailed), Snapshot) def __unicode__(self): return u"{0}(name={1})".format(lookup_class(self.type).__name__, self.name)
[docs] def ldap_replication(self, enable): """ Enable or disable LDAP replication :raises EngineCommandFailed: the LDAP replication is already enabled or disabled :param boolean enable: True enable the LDAP replication False disable it """ self.make_request( EngineCommandFailed, method="update", resource="ldap_replication", params={"enable": enable}, )
[docs] def generate_and_sign_user_authentication_certificate(self): """ Generate and internally sign User Authentication certificate. """ return Task.execute( self, "web_auth_https_generate_and_sign_certificate", wait_for_finish=True )
[docs] def delete_user_authentication_certificate(self): """ Delete the certificate if any is defined for this component. """ self.make_request(method="delete", resource="web_auth_https_delete_certificate")
[docs] def delete_user_authentication_certificate_request(self): """ Delete the certificate request if any is defined for this component. """ self.make_request(method="delete", resource="web_auth_https_delete_certificate_request")
[docs] def export_user_authentication_certificate(self, filename=None): """ Export the certificate if any is defined for this component. """ result = self.make_request( CertificateExportError, raw_result=True, resource="web_auth_https_export_certificate" ) if filename is not None: save_to_file(filename, result.content) return return result.content
[docs] def generate_user_authentication_certificate_request(self): """ Export the certificate request for the node when working external CA. This can return None if the engine type does not have a certificate request. :raises CertificateExportError: error exporting certificate :rtype: str or None """ return self.make_request( CertificateExportError, method="create", resource="web_auth_https_generate_certificate_request" )
[docs] def export_user_authentication_certificate_request(self, filename=None): """ Export the certificate request if any is defined for this component. """ result = self.make_request( CertificateExportError, raw_result=True, resource="web_auth_https_export_certificate_request" ) if filename is not None: save_to_file(filename, result.content) return return result.content
[docs] def user_authentication_import_certificate(self, certificate): """ Import a valid certificate. Certificate can be either a file path or a string of the certificate. If string certificate, it must include the -----BEGIN CERTIFICATE----- string. :param str certificate: fully qualified path or string :raises CertificateImportError: failure to import cert with reason :raises IOError: file not found, permissions, etc. """ self.make_request( CertificateImportError, method="create", resource="web_auth_https_import_certificate", headers={"content-type": "multipart/form-data"}, files={ # decode certificate or use it as it is "signed_certificate": open(certificate, "rb") if not pem_as_string(certificate) else certificate }, )
[docs] class VPNMappingCollection(BaseIterable): def __init__(self, vpns): mappings = vpns.get("vpnMappings") _mappings = [] if mappings: for entry in mappings: vpn_mapping = entry.get("vpn_mapping_entry") vpn_mapping.setdefault("gateway_nodes_usage", {}) _mappings.append(VPNMapping(**vpn_mapping)) super(VPNMappingCollection, self).__init__(_mappings)
[docs] class VPNMapping(namedtuple("VPNMapping", "gateway_ref vpn_ref gateway_nodes_usage")): """ A VPN Mapping represents Policy Based VPNs associated with this engine. This simplifies finding references where an engine is used within a VPN without iterating through existing VPNs to find the engine. """ __slots__ = () @property def internal_gateway(self): """ Return the engines internal gateway as element :rtype: InternalGateway """ return Element.from_href(self.gateway_ref) @property def vpn(self): """ The VPN policy for this engine mapping :rtype: PolicyVPN """ return Element.from_href(self.vpn_ref) @property def is_central_gateway(self): """ Is this engine a central gateway in the VPN policy :rtype: bool """ return "central_gateway_node_ref" in self.gateway_nodes_usage @property def _central_gateway(self): """ Return the central gateway tree node as href. This can be used to simplify removal of the element from the specified VPN. You must first open the VPN policy then save and close. :return: GatewayTreeNode href :rtype: str """ return self.gateway_nodes_usage.get("central_gateway_node_ref", None) @property def is_satellite_gateway(self): """ Is this engine a satellite gateway in the VPN policy :rtype: bool """ return "satellite_gateway_node_ref" in self.gateway_nodes_usage @property def _satellite_gateway(self): """ Return the satellite gateway tree node href. This can be used to simplify removal of the element from the specified VPN. You must first open the VPN policy then save and close. :return: GatewayTreeNode href :rtype: str """ return self.gateway_nodes_usage.get("satellite_gateway_node_ref", None) @property def is_mobile_gateway(self): """ Is the engine specified as a mobile gateway in the Policy VPN configuration :rtype: bool """ return "mobile_gateway_node_ref" in self.gateway_nodes_usage @property def _mobile_gateway(self): """ Return the mobile gateway tree href. This can be used to simplify removal of the element from the specified VPN. You must first open the VPN policy then save and close. :return: GatewayTreeNode href :rtype: str """ return self.gateway_nodes_usage.get("mobile_gateway_node_ref", None) def __str__(self): return str("VPNMapping(vpn={})".format(self.vpn))
[docs] class VPN(object): """ VPN is the top level interface to all engine based VPN settings. To enable IPSEC, SSL or SSL VPN on the engine, enable on the endpoint. """ def __init__(self, engine, internal_gateway=None): self.engine = engine if internal_gateway: self.internal_gateway = internal_gateway else: result = self.engine.make_request(UnsupportedEngineFeature, resource="internal_gateway") self.internal_gateway = InternalGateway(**result[0])
[docs] def rename(self, name): """ Rename the internal gateway. :param str name: new name for internal gateway :return: None """ self.internal_gateway.rename(name) # Engine update changes this ETag
[docs] def remove(self): """ Rename the internal gateway. :param str name: new name for internal gateway :return: None """ self.internal_gateway.delete() # Engine update changes this ETag
@property def name(self): return self.internal_gateway.name @property def vpn_client(self): """ VPN Client settings for this engine. Alias for internal_gateway. :rtype: InternalGateway """ return self.internal_gateway @property def sites(self): """ VPN sites configured for this engine. Using sub element methods simplify fetching sites of interest:: engine = Engine('sg_vm') mysite = engine.vpn.sites.get_contains('inter') print(mysite) :rtype: CreateCollection(VPNSite) """ return create_collection(self.internal_gateway.get_relation("vpn_site"), VPNSite)
[docs] def add_site(self, name, site_elements=None): """ Add a VPN site with site elements to this engine. VPN sites identify the sites with protected networks to be included in the VPN. Add a network and new VPN site:: >>> net = Network.get_or_create(name='wireless', ipv4_network='192.168.5.0/24') >>> engine.vpn.add_site(name='wireless', site_elements=[net]) VPNSite(name=wireless) >>> list(engine.vpn.sites) [VPNSite(name=dingo - Primary Site), VPNSite(name=wireless)] :param str name: name for VPN site :param list site_elements: network elements for VPN site :type site_elements: list(str,Element) :raises ElementNotFound: if site element is not found :raises UpdateElementFailed: failed to add vpn site :rtype: VPNSite .. note:: Update is immediate for this operation. """ site_elements = site_elements if site_elements else [] return self.sites.create(name, site_elements)
@property def internal_endpoint(self): """ Internal endpoints to enable VPN for the engine. :rtype: SubElementCollection(InternalEndpoint) """ return sub_collection( self.internal_gateway.get_relation("internal_endpoint"), InternalEndpoint ) @property def loopback_endpoint(self): """ Internal Loopback endpoints to enable VPN for the engine. :rtype: SubElementCollection(InternalEndpoint) """ return sub_collection( self.internal_gateway.get_relation("loopback_endpoint"), InternalEndpoint ) @property def gateway_profile(self): """ Gateway Profile for this VPN. This is only a valid setting on layer 3 firewalls. :rtype: GatewayProfile """ return Element.from_href(self.internal_gateway.gateway_profile) @property def gateway_settings(self): """ A gateway settings profile defines VPN specific settings related to timers such as negotiation retries (min, max) and mobike settings. Gateway settings are only present on layer 3 FW types. :rtype: GatewaySettings .. note:: This can return None on layer 3 firewalls if VPN is not enabled. """ return Element.from_href(self.engine.data.get("gateway_settings_ref")) @property def gateway_certificate(self): """ A Gateway Certificate is used by the engine for securing communications such as VPN. You can also check the expiration, view the signing CA and renew the certificate from this element. :return: GatewayCertificate :rtype: list """ return [ GatewayCertificate.from_href(cert.get('href')) for cert in self.internal_gateway.make_request(resource="gateway_certificate") ] @property def gateway_certificate_request(self): """ A Gateway Certificate request is a gateway certificate that need to be signed internally or externally using external CA :return: GatewayCertificateRequest :rtype: list """ return [ GatewayCertificateRequest.from_href(cert.get('href')) for cert in self.internal_gateway.make_request(resource="gateway_certificate_request") ]
[docs] def generate_certificate( self, common_name, organization="Forcepoint", public_key_algorithm="rsa", signature_algorithm="rsa_sha_512", key_length=2048, signing_ca=None, certificate=None, ): """ Generate an internal gateway certificate used for VPN on this engine. Certificate request should be an instance of VPNCertificate. :param: str common_name: common name for certificate :param: str organization: organization for certificate :param str public_key_algorithm: public key type to use. Valid values rsa, dsa, ecdsa. :param str signature_algorithm: signature algorithm. Valid values dsa_sha_1, dsa_sha_224, dsa_sha_256, rsa_md5, rsa_sha_1, rsa_sha_256, rsa_sha_384, rsa_sha_512, ecdsa_sha_1, ecdsa_sha_256, ecdsa_sha_384, ecdsa_sha_512. (Default: rsa_sha_512) :param int key_length: length of key. Key length depends on the key type. For example, RSA keys can be 1024, 2048, 3072, 4096. See SMC documentation for more details. :param str,VPNCertificateCA signing_ca: by default will use the internal RSA CA :param str, certificate : used directly call another _create_from_cert :raises CertificateError: error generating certificate :return: GatewayCertificate """ if certificate: return GatewayCertificate._create_from_cert(self, signing_ca, certificate) else: return GatewayCertificate._create( self, common_name, organization, public_key_algorithm, signature_algorithm, key_length, signing_ca )
def __repr__(self): return "VPN(name={})".format(self.name)
[docs] class InternalGateway(SubElement): """ InternalGateway represents the VPN Client configuration endpoint on the NGFW. Settings under Internal Gateway reflect client settings such as requiring antivirus, windows firewall and setting the VPN client mode. View settings through an engine reference:: >>> engine = Engine('dingo') >>> vpn = engine.vpn >>> vpn.name u'dingo Primary' >>> vpn.vpn_client.firewall False >>> vpn.vpn_client.antivirus False >>> vpn.vpn_client.vpn_client_mode u'ipsec' Introduced all_vpns property to get list all vpn instances, Each vpn instance associated only one internal gateway to make code backward compatible. >>> list_of_all_internal_gateways=engine.all_vpns >>> first_vpn_instance= list_of_all_internal_gateways[0] >>> first_vpn_instance.name u'dingo Primary' >>> first_vpn_instance.vpn_client.firewall False >>> first_vpn_instance.vpn_client.antivirus False >>> first_vpn_instance.vpn_client.vpn_client_mode u'ipsec' Enable client AV and windows FW:: engine.vpn.vpn_client.update( firewall=True, antivirus=True) :ivar bool firewall: require windows firewall :ivar bool antivirus: require client antivirus :ivar str vpn_client_mode: """ typeof = "internal_gateway" def rename(self, name): self._del_cache() # Engine update changes this ETag self.update(name="{} Primary".format(name))
[docs] def remove(self): """ Remove Internal Gateway from this engine. """ self.delete()
@property def internal_endpoint(self): """ Internal endpoints to enable VPN for the engine. :rtype: SubElementCollection(InternalEndpoint) """ return sub_collection(self.get_relation("internal_endpoint"), InternalEndpoint)
[docs] class InternalEndpoint(SubElement): """ An Internal Endpoint is an interface mapping that enables VPN on the associated interface. This also defines what type of VPN to enable such as IPSEC, SSL VPN, or SSL VPN Portal. To see all available internal endpoint (VPN gateways) on a particular engine, use an engine reference:: >>> engine = Engine('sg_vm') >>> for e in engine.vpn.internal_endpoint: ... print(e) ... InternalEndpoint(name=10.0.0.254) InternalEndpoint(name=172.18.1.254) You can also retrieve an internal endpoint directly and operate on it, for example, enabling it as a VPN endpoint:: engine = Engine('sg_vm') my_interface = engine.vpn.internal_endpoint.get_exact('10.0.0.254') my_interface.update(enabled=True) Multiple attributes can be updated by calling `update`:: my_interface.update(enabled=True,ipsec_vpn=True,force_nat_t=True,ssl_vpn_portal=False,ssl_vpn_tunnel=False) Available attributes: :ivar bool enabled: enable this interface as a VPN endpoint (default: False) :ivar bool nat_t: enable NAT-T (default: False) :ivar bool force_nat_t: force NAT-T (default: False) :ivar bool ssl_vpn_portal: enable SSL VPN portal on the interface (default: False) :ivar bool ssl_vpn_tunnel: enable SSL VPN tunnel on the interface (default: False) :ivar bool ipsec_vpn: enable IPSEC VPN on the interface (default: False) :ivar bool udp_encapsulation: Allow UDP encapsulation (default: False) :ivar str balancing_mode: VPN load balancing mode. Valid options are: 'standby', 'aggregate', 'active' (default: 'active') """ @property def name(self): """ Get the name from deducted name """ return self.data.get("deducted_name") @property def interface_id(self): """ Interface ID for this VPN endpoint :return: str interface id """ return self.physical_interface.interface_id @property def physical_interface(self): """ Physical interface for this endpoint. :rtype: PhysicalInterface """ return PhysicalInterface(href=self.data.get("physical_interface"))
[docs] class VirtualResource(SubElement): """ A Virtual Resource is a container placeholder for a virtual engine within a Master Engine. When creating a virtual engine, each virtual engine must have a unique virtual resource for mapping. The virtual resource has an identifier (vfw_id) that specifies the engine ID for that instance. This is called as a resource of an engine. To view all virtual resources:: list(engine.virtual_resource.all()) Available attributes: :ivar int connection_limit: Maximum number of connections for this virtual engine. 0 means unlimited (default: 0) :ivar bool show_master_nic: Show the master engine NIC id's in the virtual engine. When updating this element, make modifications and call update() """ typeof = "virtual_resource"
[docs] def create( self, name, vfw_id, domain="Shared Domain", show_master_nic=False, connection_limit=0, comment=None, ): """ Create a new virtual resource. Called through engine reference:: engine.virtual_resource.create(....) :param str name: name of virtual resource :param int vfw_id: virtual fw identifier :param str domain: name of domain to install, (default Shared) :param bool show_master_nic: whether to show the master engine NIC ID's in the virtual instance :param int connection_limit: whether to limit number of connections for this instance :return: href of new virtual resource :rtype: str """ allocated_domain = domain_helper(domain) json = { "name": name, "connection_limit": connection_limit, "show_master_nic": show_master_nic, "vfw_id": vfw_id, "comment": comment, "allocated_domain_ref": allocated_domain, } return ElementCreator(self.__class__, json=json, href=self.href)
@property def allocated_domain_ref(self): """ Domain that this virtual engine is allocated in. 'Shared Domain' is is the default if no domain is specified. :: >>> for resource in engine.virtual_resource: ... resource, resource.allocated_domain_ref ... (VirtualResource(name=ve-1), AdminDomain(name=Shared Domain)) (VirtualResource(name=ve-8), AdminDomain(name=Shared Domain)) :return: AdminDomain element :rtype: AdminDomain """ return Element.from_href(self.data.get("allocated_domain_ref"))
[docs] def set_admin_domain(self, admin_domain): """ Virtual Resources can be members of an Admin Domain to provide delegated administration features. Assign an admin domain to this resource. Admin Domains must already exist. :param str,AdminDomain admin_domain: Admin Domain to add :return: None """ admin_domain = element_resolver(admin_domain) self.data["allocated_domain_ref"] = admin_domain
@property def vfw_id(self): """ Read-Only virtual engine identifier. This is unique per virtual engine and is set when the virtual resource is created. :return: vfw id :rtype: int """ return self.data.get("vfw_id")
[docs] class LBFilter(NestedDict): """ This represents the Load Balancing Filter. """ def __init__(self, action, ip_descriptor, replace_ip, nodeid, ignore_other=False, nat_enforce=False, use_ipsec=False, use_ports=False): data = {"action": action, "ignore_other": ignore_other, "ip_descriptor": ip_descriptor, "nat_enforce": nat_enforce, "nodeid": nodeid, "replace_ip": replace_ip, "use_ipsec": use_ipsec, "use_ports": use_ports} super(LBFilter, self).__init__(data=data) @classmethod def create(cls, nodeid, ip_descriptor, replace_ip, action="replace", ignore_other=False, nat_enforce=False, use_ipsec=False, use_ports=False): """ Create a LB Filter. :param int nodeid: Node Id in case of node action :param str ip_descriptor: Represents the IPNetwork or the IPAddressRange :param str replace_ip: Address in case of replace action. :param str action: Action for the filter. possible values are: none, replace, node, select_none, replace_offset :param bool ignore_other: Tell that other entries might not be concerned. :param bool nat_enforce: Tells NAT to enforce translated packet headers to the same hash value to the matching packet. :param bool use_ipsec: Tells the engine that this entry has to be handled with special care because part of VPN :param bool use_ports: Defines whether to use port numbers when calculating the hash value for the packet. :rtype: LBFilter """ return LBFilter(action=action, ignore_other=ignore_other, ip_descriptor=ip_descriptor, nat_enforce=nat_enforce, nodeid=nodeid, replace_ip=replace_ip, use_ipsec=use_ipsec, use_ports=use_ports) @property def action(self): """ Action for the filter. possible values are: none, replace, node, select_none, replace_offset :rtype: str """ return self.get("action") @action.setter def action(self, value): self.update(action=value) @property def ip_descriptor(self): """ Represents the IPNetwork or the IPAddressRange :rtype: str """ return self.get("ip_descriptor") @ip_descriptor.setter def ip_descriptor(self, value): self.update(ip_descriptor=value) @property def replace_ip(self): """ Address in case of replace action. :rtype: str """ return self.get("replace_ip") @replace_ip.setter def replace_ip(self, value): self.update(replace_ip=value) @property def use_ipsec(self): """ Tells the engine that this entry has to be handled with special care because part of VPN :rtype: bool """ return self.get("use_ipsec") @use_ipsec.setter def use_ipsec(self, value): self.update(use_ipsec=value) @property def use_ports(self): """ Defines whether to use port numbers when calculating the hash value for the packet. :rtype: bool """ return self.get("use_ports") @use_ports.setter def use_ports(self, value): self.update(use_ports=value) @property def nat_enforce(self): """ Tells NAT to enforce translated packet headers to the same hash value to the matching packet. :rtype: bool """ return self.get("nat_enforce") @nat_enforce.setter def nat_enforce(self, value): self.update(nat_enforce=value) @property def ignore_other(self): """ Tell that other entries might not be concerned. :rtype: bool """ return self.get("ignore_other") @ignore_other.setter def ignore_other(self, value): self.update(ignore_other=value)
[docs] class IdleTimeout(NestedDict): """ This is definition of timeout by protocol or by TCP connection state. You can define general timeouts for removing idle connections from the state table, including non-TCP communications that are handled like connections. The timeout prevents wasting engine resources on storing information about abandoned connections. Timeouts are a normal way to clear traffic information with protocols that have no closing mechanism.Timeouts do not affect active connections. The connections are kept in the state table as long as the interval of packets within a connection is shorter than the timeouts set. """ default_protocol_values = {"tcp": 1800, "udp": 50, "icmp": 5, "other": 180, "tcp_closing": 60, "tcp_syn_seen": 15, "tcp_fin_wait_1": 60, "tcp_fin_wait_2": 60, "tcp_time_wait": 60, "tcp_close_wait": 60, "tcp_last_ack": 10, "tcp_syn_ack_seen": 15, "tcp_time_wait_ack": 60, "tcp_closing_ack": 60, "tcp_close_wait_ack": 60, "tcp_last_ack_wait": 10, "tcp_syn_fin_seen": 15, "tcp_syn_return": 15, "ipsec_established": 72 } def __init__(self, engine): ars = {"connection_timeout": engine.data.get("connection_timeout", {})} super(IdleTimeout, self).__init__(data=ars)
[docs] def add(self, name, timeout=None): """ Add a timeout setting for the new protocol. :param str name: name of the protocol. :param int timeout: timeout value. """ if name in self.default_protocol_values: if not timeout: timeout = self.default_protocol_values[name] self.data.get('connection_timeout').append({'protocol': name, 'timeout': timeout}) else: raise ("Invalid protocol found : {}".format(name))
[docs] def remove(self, name): """ Remove the timeout setting for specific protocols on the engine. :param str name: namr of the protocol to be removed. """ for protocol in self.data.get('connection_timeout'): if name == protocol['protocol']: self.data['connection_timeout'].remove(protocol) break
def _contains(self, name): """ Check if specific protocol settings are present in the engine. :param str name: name of protocol. """ for protocol in self.data.get('connection_timeout'): if name == protocol['protocol']: return True return False
[docs] class LocalLogStorageSettings(NestedDict): """ Local Log Storage Settings for not virtual engines. """ def __init__(self, engine): ars = engine.data.get("local_log_storage", {}) super(LocalLogStorageSettings, self).__init__(data=ars) @property def lls_guaranteed_free_percent(self): """ Minimum amount of spool space that must be left available for other uses in percentage """ return self.data.get("lls_guaranteed_free_percent") @property def lls_guaranteed_free_size_in_mb(self): """ Minimum amount of spool space that must be left available for other uses in MegaBytes """ return self.data.get("lls_guaranteed_free_size_in_mb") @property def lls_max_time(self): """ The maximum amount of hours before the stored logs are deleted. """ return self.data.get("lls_max_time") @property def local_log_storage_activated(self): """ Activate the Local Log Storage feature. At least one of the Guaranteed free disk partition values must be set up. """ return self.data.get("local_log_storage_activated")
[docs] class HAForSingleEngine(object): """ High availability configuration on the single engine. Access through an engine reference:: engine.ha_settings.ha_backup_unit engine.ha_settings.set_ha_backup_unit(pos='1234567890-1234567890') engine.ha_settings.disable_ha_backup_unit() engine.ha_settings.connection_sync_mode engine.ha_settings.set_connection_sync(ha_connection_sync_interface=0, connection_sync_group=connection_sync_group_name) engine.ha_settings.disable_connection_sync() When making changes to the HA configuration, any methods called that change the configuration also require that engine.update() is called once changes are complete. This way you can make multiple changes without refreshing the engine cache. :ivar ConnectionSynchronizationGroup connection_sync_group: ConnectionSynchronizationGroup reference for this engine. Applicable when HA mode is connection_sync. """ connection_sync_group = ElementRef("connection_sync_group") def __init__(self, data=None): self.data = data if data else ElementCache() @property def ha_backup_unit_mode(self): """ Get high availability backup unit mode for this engine :return: str or None """ return self.data.get("ha_backup_unit_mode") @property def connection_sync_mode(self): """ Get connection synchronization for external high availability mode for this engine :return: str or None """ return self.data.get("connection_sync_mode") @property def pos(self): """ proof of serial for hugh availability. Applicable when HA backup unit mode is enabled. :rtype: str or None """ return self.data.get("ha_pos_for_backup_unit") @property def sync_interface(self): """ ID of the interface for unicast state Applicable when HA mode is connection_sync. :rtype: int or None """ return self.data.get("ha_connection_sync_interface")
[docs] def disable_ha_backup_unit(self): """ Disable HA backup unit mode on this engine. :return: None """ self.data.update(ha_backup_unit_mode='disabled')
[docs] def disable_connection_sync(self): """ Disable connection synchronization for external high availability on this engine. :return: None """ self.data.update(connection_sync_mode='disabled')
[docs] def set_ha_backup_unit(self, pos=None): """ Enable high availability backup unit on this engine. Disable connection synchronization for external high availability. :param str pos: proof of serial for pairing :return: None """ self.data.update(ha_backup_unit_mode='enabled', ha_pos_for_backup_unit=pos, connection_sync_mode='disabled')
[docs] def set_connection_sync(self, connection_sync_group, ha_connection_sync_interface): """ Enable connection synchronization for external high availability on this engine. Disable high availability backup unit. :param str,ConnectionSynchronizationGroup connection_sync_group: ConnectionSynchronizationGroup element or str href :param str ha_connection_sync_interface: ID of the interface for unicast state synchronization :raises ElementNotFound: ConnectionSynchronizationGroup not found :return: None """ if isinstance(connection_sync_group, str): sync_group = ConnectionSynchronizationGroup(connection_sync_group).href else: sync_group = connection_sync_group.href self.data.update(connection_sync_mode='enabled', connection_sync_group=sync_group, ha_connection_sync_interface=ha_connection_sync_interface, ha_backup_unit_mode='disabled')