Source code for smc.policy.rule_elements

#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.

from smc.base.model import Element, ElementCreator
from smc.base.structs import NestedDict
from smc.api.exceptions import ElementNotFound, InvalidRuleValue, InvalidLogSeverity
from smc.base.util import element_resolver
from smc.compat import is_api_version_less_than_or_equal, \
    is_api_version_less_than, is_smc_version_less_than


[docs] class RuleElement(object): """ Rule Element encapsulates actions for source, destination and service fields. """ @property def is_any(self): """ Is the field set to any :rtype: bool """ return self["any"] if "any" in self else False
[docs] def set_any(self): """ Set field to any """ self.clear() self.update({"any": True})
[docs] def unset_any(self): """ UnSet field to any """ self.clear() self.update({"any": False})
@property def is_none(self): """ Is the field set to none :rtype: bool """ return "none" in self
[docs] def set_none(self): """ Set field to none """ self.clear() self.update({"none": True})
[docs] def add(self, data): """ Add a single entry to field. Entries can be added to a rule using the href of the element or by loading the element directly. Element should be of type :py:mod:`smc.elements.network`. After modifying rule, call :py:meth:`~.save`. Example of adding entry by element:: policy = FirewallPolicy('policy') for rule in policy.fw_ipv4_nat_rules.all(): if rule.name == 'therule': rule.sources.add(Host('myhost')) rule.save() .. note:: If submitting type Element and the element cannot be found, it will be skipped. :param data: entry to add :type data: Element or str """ if self.is_none or self.is_any: self.clear() self.data[self.typeof] = [] try: self.get(self.typeof).append(element_resolver(data)) except ElementNotFound: pass
[docs] def add_many(self, data): """ Add multiple entries to field. Entries should be list format. Entries can be of types relavant to the field type. For example, for source and destination fields, elements may be of type :py:mod:`smc.elements.network` or be the elements direct href, or a combination of both. Add several entries to existing rule:: policy = FirewallPolicy('policy') for rule in policy.fw_ipv4_nat_rules.all(): if rule.name == 'therule': rule.sources.add_many([Host('myhost'), 'http://1.1.1.1/hosts/12345']) rule.save() :param list data: list of sources .. note:: If submitting type Element and the element cannot be found, it will be skipped. """ assert isinstance(data, list), "Incorrect format. Expecting list." if self.is_none or self.is_any: self.clear() self.data[self.typeof] = [] data = element_resolver(data, do_raise=False) self.data[self.typeof] = data
def __eq__(self, other): if isinstance(self, type(other)): if self.is_none or self.is_any: if self.is_none and other.is_none or self.is_any and other.is_any: return True return False if other.is_none or other.is_any: # Current sources not any or none return False if set(self.all_as_href()) == set(other.all_as_href()): return True return False def __ne__(self, other): return not self.__eq__(other) def __str__(self): # fields to display fields_to_display = ("is_none", 'is_any') str = "" for key in fields_to_display: element = self.__getattribute__(key) if isinstance(element, Element): str += element.__str__() else: str += "{} = {}; ".format(key, element) return str
[docs] def update_field(self, elements): """ Update the field with a list of provided values but only if the values are different. Return a boolean indicating whether a change was made indicating whether `save` should be called. If the field is currently set to any or none, then no comparison is made and field is updated. :param list elements: list of elements in href or Element format to compare to existing field :rtype: bool """ changed = False if isinstance(elements, list): if self.is_any or self.is_none: self.add_many(elements) changed = True else: _elements = element_resolver(elements, do_raise=False) if set(self.all_as_href()) ^ set(_elements): self.data[self.typeof] = _elements changed = True if ( changed and self.rule and ( isinstance(self, (Source, Destination)) and self.rule.typeof in ("fw_ipv4_nat_rule", "fw_ipv6_nat_rule") ) ): # Modify NAT cell if necessary self.rule._update_nat_field(self) return changed
[docs] def all_as_href(self): """ Return all elements without resolving to :class:`smc.elements.network` or :class:`smc.elements.service`. Just raw representation as href. :return: elements in href form :rtype: list """ if not self.is_any and not self.is_none: return [element for element in self.get(self.typeof)] return []
[docs] def all(self): """ Return all destinations for this rule. Elements returned are of the object type for the given element for further introspection. Search the fields in rule:: for sources in rule.sources.all(): print('My source: %s' % sources) :return: elements by resolved object type :rtype: list(Element) """ if not self.is_any and not self.is_none: return [Element.from_href(href) for href in self.get(self.typeof)] return []
[docs] class Destination(RuleElement, NestedDict): """ Destination fields for a rule. """ typeof = "dst" def __init__(self, rule=None): dests = dict(none=True) if not rule else rule.data.get("destinations") self.rule = rule super(Destination, self).__init__(data=dests) def __str__(self): return "ANY={}, NONE={}, DESTINATIONS={}".format(self.is_any, self.is_none, self.dst) @property def dst(self): """ All elements corresponding to the matching criteria (if not any or none). :return: list value: source elements """ elements = self.get("dst") return [Element.from_href(element) for element in elements] \ if elements is not None else None @dst.setter def dst(self, value): self.update(dst=value)
[docs] class Source(RuleElement, NestedDict): """ Source fields for a rule """ typeof = "src" def __init__(self, rule=None): sources = dict(none=True) if not rule else rule.data.get("sources") self.rule = rule super(Source, self).__init__(data=sources) def __str__(self): return "ANY={}, NONE={}, SOURCES={}".format(self.is_any, self.is_none, self.src) @property def src(self): """ All elements corresponding to the matching criteria (if not any or none). :return: list value: source elements """ elements = self.get("src") return [Element.from_href(element) for element in elements]\ if elements is not None else None @src.setter def src(self, value): self.update(src=value)
[docs] class Service(RuleElement, NestedDict): """ Service fields for a rule """ typeof = "service" def __init__(self, rule=None): services = dict(none=True) if not rule else rule.data.get("services") self.rule = rule super(Service, self).__init__(data=services) def __str__(self): return "ANY={}, NONE={}, SERVICES={}".format(self.is_any, self.is_none, self.service) @property def service(self): """ All elements corresponding to the matching criteria (if not any or none). :return: list value: source elements """ elements = self.data.get("service") return [Element.from_href(element) for element in elements] \ if elements is not None else None @service.setter def service(self, value): self.update(service=value)
class ActionMixin(NestedDict): """ This represents the common actions associated with the rule. """ def __init__(self, rule=None): if rule is None: if is_api_version_less_than_or_equal("6.5"): action = dict(action="allow") else: action = dict(action=["allow"]) conn_tracking = ConnectionTracking() action.update(connection_tracking_options=conn_tracking.data) action.update(scan_detection="undefined") else: action = rule.data.get("action", {}) super(ActionMixin, self).__init__(data=action) @property def action(self): """ Action set for this rule Since SMC 6.6 actions have to be in list format whereas in SMC < 6.6 they were string. :param str|list value: allow\\|discard\\|continue\\|refuse\\|jump\\|apply_vpn \\|enforce_vpn\\|forward_vpn\\|block_list\\|forced_next_hop :rtype: str|list """ return self.get("action") @action.setter def action(self, value): if not is_api_version_less_than("7.0"): if isinstance(value, str) and value == "blacklist": value = "block_list" if isinstance(value, list): # replace blacklist in action list value = [action.replace("blacklist", "block_list") for action in value] self.update(action=value) @property def connection_tracking_options(self): """ Enables connection tracking. The firewall allows or discards packets according to the selected Connection Tracking mode. Reply packets are allowed as part of the allowed connection without an explicit Access rule. Protocols that use a dynamic port assignment must be allowed using a Service with the appropriate Protocol Agent for that protocol (in Access rules and NAT rules). :rtype: ConnectionTracking """ return ConnectionTracking(self)
[docs] class Action(ActionMixin): """ This represents the action associated with the rule. """ @property def decrypting(self): """ .. versionadded:: 0.6.0 Requires SMC version >= 6.3.3 Whether the decryption is enabled on this rule. :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get("decrypting") @decrypting.setter def decrypting(self, value): self.update(decrypting=value) @property def deep_inspection(self): """ Selects traffic that matches this rule for checking against the Inspection Policy referenced by this policy. Traffic is inspected as the Protocol that is attached to the Service element in this rule. :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get("deep_inspection") @deep_inspection.setter def deep_inspection(self, value): self.update(deep_inspection=value) @property def network_application_latency_monitoring(self): """ Enable or Disable the Application Health Monitoring for the matching Traffic :param bool value: True, False, None (inherit from continue rule) in 7.0 :param str value: true, false, probing, None (inherit from continue rule) in 7.1 and above :rtype: bool """ return self.get("network_application_latency_monitoring") @network_application_latency_monitoring.setter def network_application_latency_monitoring(self, value): # Added condition to fix SMC-46648 if not is_smc_version_less_than("7.0"): self.update(network_application_latency_monitoring=value) @property def forced_next_hop_ip(self): """ If action includes forced_next_hop, specify the forced next hop IP address. :rtype: str ip address """ return self.get("forced_next_hop_ip") @forced_next_hop_ip.setter def forced_next_hop_ip(self, value): if not is_smc_version_less_than("7.1"): self.update(forced_next_hop_ip=value) @property def forced_next_hop_element(self): """ If action includes forced_next_hop, specify the forced next hop element. :rtype: NetworkElement """ if "forced_next_hop_element" in self: return Element.from_href(self.get("forced_next_hop_element")) @forced_next_hop_element.setter def forced_next_hop_element(self, value): if not is_smc_version_less_than("7.1"): self.update(forced_next_hop_element=element_resolver(value)) @property def file_filtering(self): """ (IPv4 Only) Inspects matching traffic against the File Filtering policy. Selecting this option should also activates the Deep Inspection option. You can further adjust virus scanning in the Inspection Policy. :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get("file_filtering") @file_filtering.setter def file_filtering(self, value): self.update(file_filtering=value) @property def dos_protection(self): """ Enable or disable DOS protection mode :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get("dos_protection") @dos_protection.setter def dos_protection(self, value): self.update(dos_protection=value) @property def antispam(self): """ Enable or disable anti-spam :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get("antispam") @antispam.setter def antispam(self, value): self.update(antispam=value) @property def scan_detection(self): """ Enable or disable Scan Detection for traffic that matches the rule. This overrides the option set in the Engine properties. Enable scan detection on this rule:: for rule in policy.fw_ipv4_access_rules.all(): rule.action.scan_detection = 'on' :param str value: on\\|off\\|undefined :return: scan detection setting (on,off,undefined) :rtype: str """ return self.get("scan_detection") @scan_detection.setter def scan_detection(self, value): if value in ("on", "off", "undefined"): self.update(scan_detection=value) @property def sub_policy(self): """ Sub policy is used when ``action=jump``. :rtype: FirewallSubPolicy """ if "sub_policy" in self: return Element.from_href(self.get("sub_policy")) @sub_policy.setter def sub_policy(self, value): self.update(sub_policy=element_resolver(value)) @property def valid_block_lister(self): """ Used when ``action=block_list``. If specified with block_list action, you want to restrict allowed block listers for this rule. Block list entries are only accepted from the components you specify (and from the engine command line). NGFW Engines are always allowed to add entries to their own block lists. If not specified with block_list action, any block list entries are accepted from all components. :rtype: list(Engine) .. note:: This method requires SMC version >= 7.0 """ if "valid_block_lister" in self: return [Element.from_href(element) for element in self.data.get("valid_block_lister")] @valid_block_lister.setter def valid_block_lister(self, values): self.update(valid_block_lister=values) @property def valid_blacklister(self): """ Used when ``action=blacklist``. If specified with blacklist action, you want to restrict allowed block listers for this rule. Black list entries are only accepted from the components you specify (and from the engine command line). NGFW Engines are always allowed to add entries to their own block lists. If not specified with blacklist action, any black list entries are accepted from all components. :rtype: list(Engine) .. note:: This method requires SMC version < 7.0 """ if "valid_blacklister" in self: return [Element.from_href(element) for element in self.data.get("valid_blacklister")] @valid_blacklister.setter def valid_blacklister(self, values): if is_api_version_less_than("7.0"): self.update(valid_blacklister=values) else: self.update(valid_block_lister=values) @property def user_response(self): """ Read-only user response setting """ return self.get("user_response") @property def vpn(self): """ Return vpn reference. Only used if 'enforce_vpn', 'apply_vpn', or 'forward_vpn' is the action type. :param PolicyVPN value: set the policy VPN for VPN action :rtype: PolicyVPN """ if "vpn" in self: return Element.from_href(self.get("vpn")) @vpn.setter def vpn(self, value): self.update(vpn=element_resolver(value)) @property def mobile_vpn(self): """ Mobile VPN only applies to engines that support VPN and that have the action of 'enforce_vpn', 'apply_vpn' or 'forward_vpn' set. This will enable mobile VPN traffic on this VPN rule. :param boolean value: set mobile vpn on or off :rtype: boolean """ return self.get("mobile_vpn") @mobile_vpn.setter def mobile_vpn(self, value): self.update(mobile_vpn=value)
[docs] class ConnectionTracking(NestedDict): """ Connection tracking settings can be configured on a per rule basis to control settings such as enforced MSS and how to handle connection states. Configuring a rule to enable MSS and set connection state tracking to normal:: for rule in policy.fw_ipv4_access_rules.all(): rule.action.connection_tracking_options.mss_enforced = True rule.action.connection_tracking_options.state = 'normal' rule.action.connection_tracking_options.mss_enforced_min_max = (1400, 1450) rule.action.connection_tracking_options.sync_connections = True rule.save() """ def __init__(self, rule=None): if rule is None: ct = dict(mss_enforced=False, mss_enforced_max=0, mss_enforced_min=0, timeout=-1) else: ct = rule.data.get("connection_tracking_options", {}) super(ConnectionTracking, self).__init__(data=ct) @property def mss_enforced(self): """ Is MSS enforced :param bool value: True, False :return: bool """ return self.get("mss_enforced") @mss_enforced.setter def mss_enforced(self, value): self.update(mss_enforced=value) @property def mss_enforced_min_max(self): """ Allows entering the Minimum and Maximum value for the MSS in bytes. Headers are not included in the MSS value; MSS concerns only the payload portion of the packet. :param tuple int value: tuple containing (min, max) in bytes :return: (min, max) values :rtype: tuple """ return (self.get("mss_enforced_min"), self.get("mss_enforced_max")) @mss_enforced_min_max.setter def mss_enforced_min_max(self, value): if isinstance(value, tuple): minimum, maximum = value self.update(mss_enforced_min=minimum) self.update(mss_enforced_max=maximum) @property def state(self): """ Connection tracking mode. See documentation for more info. :param str value: no,loose,normal,strict :return: str """ return self.get("state") @state.setter def state(self, value): self.update(state=value) @property def timeout(self): """ The timeout (in seconds) after which inactive connections are closed. This timeout only concerns idle connections. Connections are not cut because of timeouts while the hosts are still communicating. :param int value: time in seconds :return: int """ return self.get("timeout") @timeout.setter def timeout(self, value): """ Set the idle timeout for connections in seconds :param int value: idle connection timeout """ self.update(timeout=value) @property def sync_connections(self): """ Are sync connections enabled for this engine. If None, then this is set to inherit from a continue rule. :return True, False, None (inherit from continue rule) """ return self.get("sync_connections") @sync_connections.setter def sync_connections(self, value): self.update(sync_connections=value)
[docs] class LogOptions(NestedDict): """ Log Options represent the settings related to per rule logging. Example of obtaining a rule reference and turning logging on for a particular rule:: policy = FirewallPolicy('smcpython') for rule in policy.fw_ipv4_access_rules.all(): if rule.name == 'foo': rule.options.log_accounting_info_mode = True rule.options.log_level = 'stored' rule.options.application_logging = 'enforced' rule.options.user_logging = 'enforced' rule.save() """ def __init__(self, rule=None): if rule is None: logopts = dict( log_accounting_info_mode=False, log_closing_mode=True, log_level="undefined", log_payload_excerpt=False, log_payload_record=False, log_severity=-1, ) else: logopts = rule.data.get("options", {}) super(LogOptions, self).__init__(data=logopts) @property def application_logging(self): """ Stores information about Application use. You can log application use even if you do not use Applications for access control. :param str value: off\\|default\\|enforced :return: str """ return self.get("application_logging") @application_logging.setter def application_logging(self, value): if value in ("off", "default", "enforced"): self.update(application_logging=value) @property def url_category_logging(self): """ Stores information about URL categories use. :param str value: off\\|default\\|enforced :return: str """ return self.get("url_category_logging") @url_category_logging.setter def url_category_logging(self, value): if value in ("off", "default", "enforced"): self.update(url_category_logging=value) @property def endpoint_executable_logging(self): """ Stores information about EndPoint Executable use. You can log EndPoint Executable use even if you do not use EndPoint Executables for accesscontrol. :param str value: off\\|default\\|enforced :return: str """ return self.get("eia_executable_logging") @endpoint_executable_logging.setter def endpoint_executable_logging(self, value): if value in ("off", "default", "enforced"): self.update(eia_executable_logging=value) @property def log_accounting_info_mode(self): """ Both connection opening and closing are logged and information on the volume of traffic is collected. This option is not available for rules that issue alerts. If you want to create reports that are based on traffic volume, you must select this option for all rules that allow traffic that you want to include in the reports. :param bool value: log accounting information (bits/bytes transferred) :return: bool """ return self.get("log_accounting_info_mode") @log_accounting_info_mode.setter def log_accounting_info_mode(self, value): self.update(log_accounting_info_mode=value) @property def log_closing_mode(self): """ Specifying False means no log entries are created when connections are closed. True will mean both connection opening and closing are logged, but no information is collected on the volume of traffic. :param bool value: enable/disable accounting data :return: bool """ return self.get("log_closing_mode") @log_closing_mode.setter def log_closing_mode(self, value): self.update(log_closing_mode=value) @property def log_level(self): """ Configure per rule logging. It is recommended to configure an Any/Any/Any/Continue rule in position 1 if global logging is required. This can be used to override any global logging setting. :param str value: none\\|stored\\|transient\\|essential\\|alert\\|undefined :return: str """ return self.get("log_level") @log_level.setter def log_level(self, value): if value in ("none", "stored", "transient", "essential", "alert"): self.update(log_level=value) if not self.log_accounting_info_mode: self.log_accounting_info_mode = True @property def qos_class(self): """ Matching QoS Classes. The QoS Rules are linked to different types of traffic using the QoS Classes. QoS Classes are matched to traffic in the Access rules with the following actions: - Access rules with the Allow action set a QoS Class for traffic that matches the rules. - Access rules with the Continue action set a QoS Class for all subsequent matching rules that have no specific QoS Class defined. - Access rules with the Use VPN action (Firewall only) set a QoS Class for VPN traffic. Incoming VPN traffic may also match a normal Allow rule after decryption. Otherwise, for outgoing traffic, encryption is done after the QoS Policy is checked. For incoming traffic, decryption is done before the QoS Policy is checked. However, if you only want to read and use DSCP markers set by other devices, the QoS Class is assigned according to the rules on the DSCP Match/Mark tab of the QoS Policy. :return: QoSClass """ qos_class_ref = self.get("qos_class") qos_class = None if qos_class_ref: qos_class = Element.from_href(qos_class_ref) return qos_class @qos_class.setter def qos_class(self, value): qos_class = None if value: qos_class = element_resolver(value) self.update(qos_class=qos_class) @property def log_payload_excerpt(self): """ Stores an excerpt of the packet that matched. The maximum recorded excerpt size is 4 KB. This allows quick viewing of the payload in the logs view. :param bool value: collect excerpt or not :return: bool """ return self.get("log_payload_excerpt") @log_payload_excerpt.setter def log_payload_excerpt(self, value): self.update(log_payload_excerpt=value) @property def log_alert(self): """ If Log Level is set to Alert, specifies the Alert that is sent. Specifying different Alerts for different types of rules allows more fine-grained alert escalation policies. :return: AlertElement """ log_alert_ref = self.get("log_alert") log_alert = None if log_alert_ref: log_alert = Element.from_href(log_alert_ref) return log_alert @log_alert.setter def log_alert(self, value): alert = None if value: alert = element_resolver(value) self.update(log_alert=alert) @property def log_compression(self): """ Stores information about Compress Logs. :param str value: off\\|only_access\\|also_inspection :return: str """ return self.get("log_compression") @log_compression.setter def log_compression(self, value): if value in ("off", "only_access", "also_inspection"): self.update(log_compression=value) @property def log_compression_max_burst_size(self): """ If Compress Logs is selected, the max burst size. :return: int. None if not defined. """ return self.get("log_compression_max_burst_size") @log_compression_max_burst_size.setter def log_compression_max_burst_size(self, value): self.update(log_compression_max_burst_size=value) @property def log_compression_max_log_rate(self): """ If Compress Logs is selected, the max log rate. :return: int. None if not defined. """ return self.get("log_compression_max_log_rate") @log_compression_max_log_rate.setter def log_compression_max_log_rate(self, value): self.update(log_compression_max_log_rate=value) @property def log_payload_record(self): """ Records the traffic up to the limit that is set in the Record Length field. :param bool value: True, False :return: bool """ return self.get("log_payload_record") @log_payload_record.setter def log_payload_record(self, value): self.update(log_payload_record=value) @property def log_severity(self): """ Read only log severity level :return: int """ return self.get("log_severity") @log_severity.setter def log_severity(self, value): """ If Log Level is set to Alert, allows you to override the severity defined in the Alert element. :param int value: Severity values are given below. 10: Critical 7: High 4: Low 1: Information """ if value not in [1, 4, 7, 10]: raise InvalidLogSeverity("Invalid log severity, please pass correct log severity.") self.update(log_severity=value) @property def user_logging(self): """ Stores information about Users when they are used as the Source or Destination of an Access rule. You must select this option if you want Users to be referenced by name in log entries, statistics, reports, and user monitoring. Otherwise, only the IP address associated with the User at the time the log was created is stored. :param str value: off\\|default\\|enforced :return: str """ return self.get("user_logging") @user_logging.setter def user_logging(self, value): if value in ("off", "default", "enforced"): self.update(user_logging=value) def __str__(self): return "{}(log_accounting_info_mode={}, log_closing_mode={}, log_level={}," \ " log_payload_excerpt={}, log_payload_record={}, log_severity= {})"\ .format(self.__class__.__name__, self.log_accounting_info_mode, self.log_closing_mode, self.log_level, self.log_payload_excerpt, self.log_payload_record, self.log_severity)
class SourceVpn(NestedDict): type_list = ("normal", "no_vpn", "mobile_vpn") """ This represents The Match VPN Options. The rule matches traffic from specific VPNs. A sub part of Firewall Access Rule used for matching the source VPN. """ def __init__(self, rule=None): if rule is None: match_vpn_options = dict(match_type="no_vpn", match_vpns=[]) else: match_vpn_options = rule.data.get("match_vpn_options", {}) super(SourceVpn, self).__init__(data=match_vpn_options) def __eq__(self, other): if isinstance(other, SourceVpn): if self.match_type != other.match_type: return False for values in "match_vpns": if set(self.data.get(values, [])) != set(other.data.get(values, [])): return False return True return False def __ne__(self, other): return not self.__eq__(other) def __str__(self): return "{}(match_type={} match_vpns={})".format( self.__class__.__name__, self.match_type, self.match_vpns ) @property def match_vpns(self): """ The possible list of match vpns in case of normal match. :return: list value: vpns or None """ vpns = self.get("match_vpns") return [Element.from_href(vpn) for vpn in vpns] if vpns is not None else None @match_vpns.setter def match_vpns(self, value): self.update(match_vpns=value) @property def match_type(self): """ Matches traffic based on the source VPN: normal: the rule only matches traffic from the specified VPNs. no_vpn: the rule only matches non-VPN traffic. mobile_vpn: the rule only matches traffic from IPsec VPN client. :return: str """ return self.get("match_type") @match_type.setter def match_type(self, value): if value in self.type_list: self.update(match_type=value) else: raise InvalidRuleValue( "match_type attribute {} is invalid! Should be part of {}".format( value, self.type_list ) )
[docs] class AuthenticationOptions(NestedDict): """ Authentication options are set on a per rule basis and dictate whether a user requires identification to match. """ def __init__(self, rule=None): if rule is None: auth = dict(methods=[], require_auth=False, users=[]) else: auth = rule.data.get("authentication_options", {}) super(AuthenticationOptions, self).__init__(data=auth) def __eq__(self, other): if isinstance(other, AuthenticationOptions): if self.require_auth != other.require_auth: return False for values in ("users", "methods"): if set(self.data.get(values, [])) != set(other.data.get(values, [])): return False return True return False def __ne__(self, other): return not self.__eq__(other) @property def methods(self): """ Read only authentication methods enabled :return: list value: auth methods enabled """ return [Element.from_href(method) for method in self.get("methods")] @property def require_auth(self): """ Ready only authentication required :return: boolean """ return self.get("require_auth") @property def timeout(self): """ Timeout between authentications :return: int """ return self.get("timeout") @property def users(self): """ List of users required to authenticate :return: list """ return [Element.from_href(user) for user in self.get("users", [])]
class TimeRange(object): """ Represents a time range setting for a given rule. Time ranges can currently be set up to support rules based on starting month and ending month. At that time the rule will be disabled automatically. """ def __init__(self, data): self.data = data @property def day_ranges(self): """ Not Yet Implemented """ pass @property def month_range_start(self): """ Starting month for rule validity. Use this with month_range_end. :param str jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec """ return self.data.get("month_range_start") @month_range_start.setter def month_range_start(self, value): self.data["month_range_start"] = value @property def month_range_end(self): """ Set month end range. Use this with month_range_start. :param str jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec """ return self.data.get("month_range_end") @month_range_end.setter def month_range_end(self, value): self.data["month_range_end"] = value
[docs] class MatchExpression(Element): """ A match expression is used in the source / destination / service fields to group together elements into an 'AND'ed configuration. For example, a normal rule might have a source field that could include network=172.18.1.0/24 and zone=Internal objects. A match expression enables you to AND these elements together to enforce the match requires both. Logically it would be represented as (network 172.18.1.0/24 AND zone Internal). >>> from smc.elements.network import Host, Zone >>> from smc.policy.rule_elements import MatchExpression >>> from smc.policy.layer3 import FirewallPolicy >>> match = MatchExpression.create(name='mymatch', network_element=Host('kali'), zone=Zone('Mail')) >>> policy = FirewallPolicy('smcpython') >>> policy.fw_ipv4_access_rules.create(name='myrule', sources=[match], destinations='any', services='any') 'http://172.18.1.150:8082/6.2/elements/fw_policy/261/fw_ipv4_access_rule/2099740' >>> rule = policy.search_rule('myrule') ... >>> for source in rule[0].sources.all(): ... print(source, source.values()) ... MatchExpression(name=MatchExpression _1491760686976_2) [Zone(name=Mail), Host(name=kali)] Match expressions can also be used on service fields by providing values for service and service ports as follows:: match = MatchExpression.create(name='mymatch', service=ApplicationSituation('FTP'), service_ports='TCPService('Any TCP Service')') If the service match expression requires ANY ports, you can use the string of 'any' to provide this logic:: match = MatchExpression.create(name='mymatch', service=ApplicationSituation('FTP'), service_ports='any') Once the service match expression is created, you can use that in the policy rule:: policy.fw_ipv4_access_rules.create(name='myrule', sources='any', destinations='any', services=[match], action=['discard']) """ typeof = "match_expression"
[docs] @classmethod def create( cls, name, user=None, network_element=None, domain_name=None, zone=None, executable=None, service=None, service_ports="any", ): """ Create a match expression :param str name: name of match expression :param str user: name of user or user group :param Element network_element: valid network element type, i.e. host, network, etc :param DomainName domain_name: domain name network element :param Zone zone: zone to use :param str executable: name of executable or group :param service: any service type, i.e. TCPService, UDPService, ApplicationSituation :param str,Element service_ports: specify the service ports for the given service. Provide 'ANY' as the value if the match expression requires a service/application and ANY ports :raises ElementNotFound: specified object does not exist :return: instance with meta :rtype: MatchExpression """ ref_list = [] if user: pass if network_element: ref_list.append(network_element.href) if domain_name: ref_list.append(domain_name.href) if zone: ref_list.append(zone.href) if executable: pass if service: ref_list.append(service.href) json = {"name": name, "ref": ref_list} if service: try: json.setdefault("ref", []).append(service_ports.href) # Assume Element except AttributeError: json.update(service_port_selection="any") return ElementCreator(cls, json)
def values(self): return [Element.from_href(ref) for ref in self.data.get("ref")]
class SituationMatchPart(RuleElement, NestedDict): """ situations fields for a rule """ typeof = "situation" def __init__(self, rule=None): situations = dict(none=True) if not rule else rule.data.get("situations") self.rule = rule super(SituationMatchPart, self).__init__(data=situations) def __str__(self): return "ANY={}, NONE={}, SITUATIONS={}".format(self.is_any, self.is_none, self.situations) @property def situation(self): """ All elements corresponding to the matching criteria (if not any or none). :return: list value: situation elements """ elements = self.get("situation") return [Element.from_href(element) for element in elements] \ if elements is not None else None @situation.setter def situation(self, value): self.update(situation=value) class FileFilteringRuleAction(ActionMixin): @property def rematch_archive_content(self): """ Is Rematch Archive Content enabled? :rtype: bool """ return self.get("rematch_archive_content") @rematch_archive_content.setter def rematch_archive_content(self, value): self.update(rematch_archive_content=value) @property def file_reputation(self): """ Is File Reputation enabled? :rtype: bool """ return self.get("file_reputation") @file_reputation.setter def file_reputation(self, value): self.update(file_reputation=value) @property def antivirus(self): """ Is Anti-Malware enabled? :rtype: bool """ return self.get("antivirus") @antivirus.setter def antivirus(self, value): self.update(antivirus=value) @property def sandbox(self): """ Is Sandbox enabled? :rtype: bool """ return self.get("sandbox") @sandbox.setter def sandbox(self, value): self.update(sandbox=value) @property def sandbox_delay_file_transfer(self): """ Is Sandbox Delay enabled? Delay file transfer until the analysis results are received :rtype: bool """ return self.get("sandbox_delay_file_transfer") @sandbox_delay_file_transfer.setter def sandbox_delay_file_transfer(self, value): self.update(sandbox_delay_file_transfer=value) @property def sandbox_allow_level(self): """ Sandbox allow level. :rtype: str """ return self.get("sandbox_allow_level") @sandbox_allow_level.setter def sandbox_allow_level(self, value): self.update(sandbox_allow_level=value) @property def file_reputation_allow_level(self): """ File reputation allow level. :rtype: str """ return self.get("file_reputation_allow_level") @file_reputation_allow_level.setter def file_reputation_allow_level(self, value): self.update(file_reputation_allow_level=value) @property def file_reputation_discard_level(self): """ File reputation discard level. :rtype: str """ return self.get("file_reputation_discard_level") @file_reputation_discard_level.setter def file_reputation_discard_level(self, value): self.update(file_reputation_discard_level=value) @property def spooling_level(self): """ Spooling level. :rtype: str """ return self.get("spooling_level") @spooling_level.setter def spooling_level(self, value): self.update(spooling_level=value) @property def dirty_log_level(self): """ Dirty log level. :rtype: str """ return self.get("dirty_log_level") @dirty_log_level.setter def dirty_log_level(self, value): self.update(dirty_log_level=value) @property def default_behavior(self): """ Default behavior. :rtype: str """ return self.get("default_behavior") @default_behavior.setter def default_behavior(self, value): self.update(default_behavior=value) @property def icap_dlp_scan_enabled(self): """ Is ICAP DLP Scan enabled? :rtype: bool """ return self.get("icap_dlp_scan_enabled") @icap_dlp_scan_enabled.setter def icap_dlp_scan_enabled(self, value): self.update(icap_dlp_scan_enabled=value) @property def icap_dlp_service_fail_action(self): """ ICAP DLP Scan service fail action. :rtype: str """ return self.get("icap_dlp_service_fail_action") @icap_dlp_service_fail_action.setter def icap_dlp_service_fail_action(self, value): self.update(icap_dlp_service_fail_action=value) @property def icap_dlp_file_size_exceeded_action(self): """ ICAP DLP file size exceeded action. :rtype: str """ return self.get("icap_dlp_file_size_exceeded_action") @icap_dlp_file_size_exceeded_action.setter def icap_dlp_file_size_exceeded_action(self, value): self.update(icap_dlp_file_size_exceeded_action=value) @property def icap_dlp_max_file_size(self): """ ICAP DLP max file size. :rtype: int """ return self.get("icap_dlp_max_file_size") @icap_dlp_max_file_size.setter def icap_dlp_max_file_size(self, value): self.update(icap_dlp_max_file_size=value)