Source code for smc.policy.rule

#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.
"""
Rule module is a base class for all access control and NAT rules.

::

    Policy (base)
          |
    FirewallPolicy ----> fw_ipv4_access_rules
                                 |
                                 |
                    IPv4Rule / IPv4NATRule (smc.policy.rule.Rule)
                       |            |
                        ------------
                            |
                           name
                           comment
                           sources (smc.policy.rule_elements.Source)
                           destinations (smc.policy.rule_elements.Destination)
                           services (smc.policy.rule_elements.Service)
                           action (smc.policy.rule_elements.Action)
                           authentication_options (smc.policy.rule_elements.AuthenticationOptions)
                           match_vpn_options (smc.policy.rule_elements.SourceVpn)
                           is_disabled
                           disable
                           enable
                           options (smc.policy.rule_elements.LogOptions)
                                    parent_policy
                                    tag
                                     ...

Examples of rule operations::

    >>> from smc.policy.layer3 import FirewallPolicy
    >>> from smc.policy.rule_elements import LogOptions
    >>> from smc.policy.rule_elements import Action
    >>> from smc.elements.other import Alias
    ...
    >>> options = LogOptions()
    >>> options.log_accounting_info_mode=True
    >>> options.log_level='stored'
    ...
    >>> policy = FirewallPolicy('AWS_Default')
    >>> options = LogOptions()
    >>> options.log_accounting_info_mode=True
    >>> options.log_level='stored'
    >>> policy.fw_ipv4_access_rules.create(name='mylogrule',services='any',sources='any',
                                           destinations='any',actions='continue',
                                           log_options=options)
    'http://172.18.1.150:8082/6.2/elements/fw_policy/272/fw_ipv4_access_rule/2099703'
    ...
    >>> actions = Action()
    >>> actions.deep_inspection = True
    >>> actions.file_filtering=False
    >>> actions.network_application_latency_monitoring=False
    ...
    >>> policy.fw_ipv4_access_rules.create(name='outbound',sources=[Alias('$$ Interface ID 1.net')],
                                           destinations='any',services='any',
                                           action=actions,log_options=options)
    'http://172.18.1.150:8082/6.2/elements/fw_policy/272/fw_ipv4_access_rule/2099704'
    >>> for rule in policy.fw_ipv4_access_rules.all():
    ...   print(rule)
    ...
    IPv4Rule(name=outbound)
    IPv4Rule(name=mylogrule)
    ...
    >>> policy.search_rule('outbound')
    [IPv4Rule(name=outbound)]
    ...
    >>> policy.fw_ipv4_access_rules.create(name='discard at bottom', sources='any',
                                           destinations='any',services='any',
                                           action='discard',add_pos=50)
    'http://172.18.1.150:8082/6.2/elements/fw_policy/272/fw_ipv4_access_rule/2099705'
    >>> for rule in policy.fw_ipv4_access_rules.all():
    ...   print(rule, rule.name, rule.action.action)
    ...
    IPv4Rule(name=outbound) outbound allow
    IPv4Rule(name=mylogrule) mylogrule allow
    IPv4Rule(name=discard at bottom) discard at bottom discard

.. note:: SMC version >= 6.6.0 requires the use of a list of strings for rule actions
"""
from smc.base.model import Element, SubElement, ElementCreator
from smc.elements.other import LogicalInterface
from smc.api.exceptions import (
    ElementNotFound,
    MissingRequiredInput,
    CreateRuleFailed,
    PolicyCommandFailed,
)
from smc.policy.rule_elements import (
    Action,
    LogOptions,
    Destination,
    Source,
    Service,
    AuthenticationOptions,
    SourceVpn, SituationMatchPart, ActionMixin,
)
from smc.base.util import element_resolver
from smc.base.decorators import cacheable_resource
from smc.core.resource import History
from smc.compat import get_best_version, is_api_version_less_than_or_equal


[docs] class Rule(object): """ Top level rule construct with methods required to modify common behavior of any rule types. To retrieve a rule, access by reference:: policy = FirewallPolicy('mypolicy') for rule in policy.fw_ipv4_nat_rules.all(): print(rule.name, rule.comment, rule.is_disabled) """ @property def name(self): """ Name attribute of rule element """ return self._meta.name if self._meta.name else "Rule @%s" % self.tag @property def history(self): """ .. versionadded:: 0.6.3 Requires SMC version >= 6.5 Obtain the history of this element. This will not chronicle every modification made over time, but instead a current snapshot with historical information such as when the element was created, by whom, when it was last modified and it's current state. :raises ResourceNotFound: If not running SMC version >= 6.5 :rtype: History """ return History(**self.make_request(resource="history"))
[docs] def move_rule_after(self, other_rule): """ Add this rule after another. This process will make a copy of the existing rule and add after the specified rule. If this raises an exception, processing is stopped. Otherwise the original rule is then deleted. You must re-retrieve the new element after running this operation as new references will be created. :param other_rule Rule: rule where this rule will be positioned after :raises CreateRuleFailed: failed to duplicate this rule, no move is made """ self.make_request( CreateRuleFailed, href=other_rule.get_relation("add_after"), method="create", json=self ) self.delete()
[docs] def move_rule_before(self, other_rule): """ Move this rule after another. This process will make a copy of the existing rule and add after the specified rule. If this raises an exception, processing is stopped. Otherwise the original rule is then deleted. You must re-retrieve the new element after running this operation as new references will be created. :param other_rule Rule: rule where this rule will be positioned before :raises CreateRuleFailed: failed to duplicate this rule, no move is made """ self.make_request( CreateRuleFailed, href=other_rule.get_relation("add_before"), method="create", json=self ) self.delete()
@cacheable_resource def action(self): """ Action for this rule. :rtype: Action """ return Action(self) @cacheable_resource def options(self): """ Options for this rule. :rtype: LogOptions """ return LogOptions(self) @cacheable_resource def authentication_options(self): """ Read only authentication options field :rtype: AuthenticationOptions """ return AuthenticationOptions(self) @cacheable_resource def match_vpn_options(self): """ Read only match vpn options field :rtype: SourceVpn """ return SourceVpn(self) @property def comment(self): """ Optional comment for this rule. :param str value: string comment :rtype: str """ return self.data.get("comment") @comment.setter def comment(self, value): self.data["comment"] = value @property def is_rule_section(self): """ Is this rule considered a rule section :rtype: bool """ return not any(field for field in ("sources", "destinations") if field in self.data) @property def is_disabled(self): """ Whether the rule is enabled or disabled :param bool value: True, False :rtype: bool """ return self.data.get("is_disabled")
[docs] def disable(self): """ Disable this rule """ self.data["is_disabled"] = True
[docs] def enable(self): """ Enable this rule """ self.data["is_disabled"] = False
@cacheable_resource def sources(self): """ Sources assigned to this rule :rtype: Source """ return Source(self) @cacheable_resource def destinations(self): """ Destinations for this rule :rtype: Destination """ return Destination(self) @cacheable_resource def services(self): """ Services assigned to this rule :rtype: Service """ return Service(self) @property def parent_policy(self): """ Read-only name of the parent policy :return: :class:`smc.base.model.Element` of type policy """ return Element.from_href(self.data.get("parent_policy"))
[docs] def save(self): """ After making changes to a rule element, you must call save to apply the changes. Rule changes are made to cache before sending to SMC. :raises PolicyCommandFailed: failed to save with reason :return: href of this rule :rtype: str """ return self.update()
[docs] def update(self, validate=True, sources=None, destinations=None, services=None, action=None, **kwargs): """ update a rule :param sources: source/s for rule :type sources: str, list[Element] str can be "any" or json :param destinations: destination/s for rule :type destinations: str, list[Element] str can be "any" or json :param services: service/s for rule :type services: str, list[Element] str can be "any" or json :param bool validate: validate the policy before update; default True :return: href of this rule :rtype: str :param action: action/s for rule :type action: str, list[str] since API 6.6, json """ rule_values = self.update_targets(sources, destinations, services) if action is not None: # action still compatible with json if isinstance(action, dict): # Api 6.5 compatibility if is_api_version_less_than_or_equal("6.5"): if isinstance(action["action"], list): action["action"] = action["action"][0] else: if isinstance(action["action"], str): action["action"] = [action["action"]] rule_values.update(action=action) else: rule_action = self._get_action(action) rule_values.update(action=rule_action.data) if 'options' in kwargs and 'endpoint_executable_logging' in kwargs['options']: kwargs['options']['eia_executable_logging'] = \ kwargs['options'].pop('endpoint_executable_logging') rule_values.update(kwargs) if not validate: rule_values.update(params={"validate": False}) result = super(Rule, self).update(PolicyCommandFailed, **rule_values) try: del self._cache except AttributeError: pass return result
@property def tag(self): """ Value of rule tag. Read only. :return: rule tag :rtype: str """ return self.data.get("tag")
# @property # def time_range(self): # """ # Time range/s assigned to this rule. May be None if # no time range configured. # :return: :py:class:`smc.policy.rule_elements.TimeRange` # """ # time_range = self.data.get('time_range') # if time_range: # return TimeRange(self.data.get('time_range')) class RuleCommon(object): """ Functionality common to all rules """ def create_rule_section(self, name, add_pos=None, after=None, before=None): """ Create a rule section in a Firewall Policy. To specify a specific numbering position for the rule section, use the `add_pos` field. If no position or before/after is specified, the rule section will be placed at the top which will encapsulate all rules below. Create a rule section for the relavant policy:: policy = FirewallPolicy('mypolicy') policy.fw_ipv4_access_rules.create_rule_section(name='attop') # For NAT rules policy.fw_ipv4_nat_rules.create_rule_section(name='mysection', add_pos=5) :param str name: create a rule section by name :param int add_pos: position to insert the rule, starting with position 1. If the position value is greater than the number of rules, the rule is inserted at the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually exclusive with ``after`` and ``before`` params. :param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos`` and ``before`` params. :param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos`` and ``after`` params. :raises MissingRequiredInput: when options are specified the need additional setting, i.e. use_vpn action requires a vpn policy be specified. :raises CreateRuleFailed: rule creation failure :return: the created ipv4 rule :rtype: IPv4Rule """ href = self.href params = None if add_pos is not None: href = self.add_at_position(add_pos) elif before or after: params = self.add_before_after(before, after) return ElementCreator( self.__class__, exception=CreateRuleFailed, href=href, params=params, json={"comment": name}, ) def create_insert_point(self, name, insert_point_type="normal", add_pos=None, after=None, before=None): """ Create an insert point in a Template Firewall Policy. If no position or before/after is specified, the insert point will be placed at the top Create an insert point for the relevant template policy:: policy = FirewallPolicy('mypolicy') policy.fw_ipv4_access_rules.create_insert_point(name="my insert point", insert_point_type="normal", after=section.tag) :param str name: name of the insert point :param str insert_point_type: type of the insert point: - "automatic": automatic rules insert point - "normal": normal insert point :param int add_pos: position to insert the rule, starting with position 1. If the position value is greater than the number of rules, the rule is inserted at the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually exclusive with ``after`` and ``before`` params. :param str after: Rule tag to add this insert point after. Mutually exclusive with ``before`` and 'add_pos' params. :param str before: Rule tag to add this insert point before. Mutually exclusive with ``after`` and 'add_pos' params. :raises CreateRuleFailed: rule creation failure :return: the created ipv4 rule :rtype: IPv4Rule, IPv6Rule.. """ href = self.href params = None if add_pos is not None: href = self.add_at_position(add_pos) elif before or after: params = self.add_before_after(before, after) return ElementCreator( self.__class__, exception=CreateRuleFailed, href=href, params=params, json={"name": name, "type": insert_point_type} ) def add_at_position(self, pos): if pos <= 0: pos = 1 rules = self.make_request(href=self.href) if rules: if len(rules) >= pos: # Position somewhere in the list for position, entry in enumerate(rules): if position + 1 == pos: return self.__class__(**entry).get_relation("add_before") else: # Put at the end last_rule = rules.pop() return self.__class__(**last_rule).get_relation("add_after") return self.href def add_before_after(self, before=None, after=None): params = None if after is not None: params = {"after": after} elif before is not None: params = {"before": before} return params def update_targets(self, sources=None, destinations=None, services=None, situations=None): if isinstance(sources, Source): source = sources else: source = Source() if sources is not None: source.clear() if isinstance(sources, str) and sources.lower() == "any": source.set_any() # still allow json as parameter elif isinstance(sources, dict): source.unset_any() source.add_many(sources.get("src")) else: source.unset_any() source.add_many(sources) else: source.set_none() if isinstance(destinations, Destination): destination = destinations else: destination = Destination() if destinations is not None: destination.clear() if isinstance(destinations, str) and destinations.lower() == "any": destination.set_any() # still allow json as parameter elif isinstance(destinations, dict): destination.unset_any() destination.add_many(destinations.get("dst")) else: destination.unset_any() destination.add_many(destinations) else: destination.set_none() if isinstance(services, Service): service = services else: service = Service() if services is not None: service.clear() if isinstance(services, str) and services.lower() == "any": service.set_any() # still allow json as parameter elif isinstance(services, dict): service.unset_any() service.add_many(services.get("service")) else: service.unset_any() service.add_many(services) else: service.set_none() if isinstance(situations, SituationMatchPart): situation = situations else: situation = SituationMatchPart() if situations is not None: situation.clear() if isinstance(situations, str) and situations.lower() == "any": situation.set_any() # still allow json as parameter elif isinstance(situations, dict): situation.unset_any() situation.add_many(situations.get("situation")) else: situation.unset_any() situation.add_many(situations) else: situation.set_none() e = {} if sources is not None: e.update(sources=source.data) if destinations is not None: e.update(destinations=destination.data) if services is not None: e.update(services=service.data) if situations is not None: e.update(situations=situation.data) return e def update_logical_if(self, logical_interfaces): e = {} if logical_interfaces is None: e.update(logical_interfaces={"any": True}) else: try: logicals = [] for interface in logical_interfaces: logicals.append(LogicalInterface(interface).href) e.update(logical_interfaces={"logical_interface": logicals}) except ElementNotFound: raise MissingRequiredInput( "Cannot find Logical interface specified " ": {}".format(logical_interfaces) ) return e def get_action(self): """ Return action instance. rtype: Action """ return Action() def _get_action(self, action): """ Get the action field for a rule. In SMC 6.6 actions have to be in list format whereas in SMC < 6.6 they were string. :param str,list action: provided action in create constructor :rtype: Action :raises CreateRuleFailed: invalid rule based on rule """ versioned_method = get_best_version( ("6.5", self._get_action_6_5), ("6.6", self._get_action_6_6) ) return versioned_method(action) def _get_action_6_6(self, action): if isinstance(action, ActionMixin): rule_action = action if isinstance(action.action, str): rule_action.action = [action.action] else: rule_action = self.get_action() if isinstance(action, str): rule_action.action = [action] else: rule_action.action = action valid_action = False if isinstance(rule_action.action, list): valid_action = all(_action in self._actions for _action in rule_action.action) else: valid_action = rule_action.action in self._actions if not valid_action: raise CreateRuleFailed( "Action specified is not valid for this " "rule type; action: {}".format(rule_action.action) ) return rule_action def _get_action_6_5(self, action): """ Get the action field for a rule. In SMC 6.6 actions have to be in list format whereas in SMC < 6.6 they were string. :param str,list action: provided action in create constructor :rtype: Action :raises CreateRuleFailed: invalid rule based on rule """ if isinstance(action, ActionMixin): rule_action = action if isinstance(action.action, list): rule_action.action = action.action[0] else: rule_action = self.get_action() if isinstance(action, str): rule_action.action = action elif isinstance(action, list): rule_action.action = action[0] else: raise CreateRuleFailed( "Action specified should be a str " "rule type; action: {}".format(action) ) valid_action = False if isinstance(rule_action.action, list): valid_action = all(_action in self._actions for _action in rule_action.action) else: valid_action = rule_action.action in self._actions if not valid_action: raise CreateRuleFailed( "Action specified is not valid for this " "rule type; action: {}".format(rule_action.action) ) return rule_action def is_locked(self): """ Locked flag for this rule. """ return self.data.get("locked", None) def lock(self, reason_for=None): """ .. Requires SMC version >= 6.10.10 or >= 7.0.2 or >= 7.1.0 Locks this rule with an optional reason. :raises ResourceNotFound: If not running on supported SMC version """ if reason_for: return self.make_request(method="update", resource="lock", params={"reason_for": reason_for}) else: return self.make_request(method="update", resource="lock") def unlock(self): """ .. Requires SMC version >= 6.10.10 or >= 7.0.2 or >= 7.1.0 Unlocks this rule. :raises ResourceNotFound: If not running on supported SMC version """ return self.make_request(method="update", resource="unlock")
[docs] class IPv4Rule(RuleCommon, Rule, SubElement): """ Represents an IPv4 Rule for a layer 3 engine. Create a rule:: policy = FirewallPolicy('mypolicy') policy.fw_ipv4_access_rules.create(name='smcpython', sources='any', destinations='any', services='any') Sources and Destinations can be one of any valid network element types defined in :py:class:`smc.elements.network`. Source entries by href:: sources=['http://1.1.1.1:8082/elements/network/myelement', 'http://1.1.1.1:8082/elements/host/myhost'], etc Source entries using network elements:: sources=[Host('myhost'), Network('thenetwork'), AddressRange('range')] Services have a similar syntax and can take any type of :py:class:`smc.elements.service` or the element href or both:: services=[TCPService('myservice'), 'http://1.1.1.1/8082/elements/tcp_service/mytcpservice', 'http://1.1.1.1/8082/elements/udp_server/myudpservice'], etc You can obtain services and href for the elements by using the :py:class:`smc.base.collection` collections:: >>> services = list(TCPService.objects.filter('80')) >>> for service in services: ... print(service, service.href) ... (TCPService(name=tcp80443), u'http://172.18.1.150:8082/6.1/elements/tcp_service/3535') (TCPService(name=HTTP to Web SaaS), u'http://172.18.1.150:8082/6.1/elements/tcp_service/589') (TCPService(name=HTTP), u'http://172.18.1.150:8082/6.1/elements/tcp_service/440') Services by application (get all facebook applications):: >>> applications = Search.objects.entry_point('application_situation').filter('facebook') >>> print(list(applications)) [ApplicationSituation(name=Facebook-Plugins-Share-Button), ApplicationSituation(name=Facebook-Plugins] ... Sources / Destinations and Services can also take the string value 'any' to allow all. For example:: sources='any' """ typeof = "fw_ipv4_access_rule" _actions = ( "allow", "discard", "continue", "refuse", "jump", "apply_vpn", "enforce_vpn", "forward_vpn", "blacklist", "block_list", "forced_next_hop" )
[docs] def create( self, name, sources=None, destinations=None, services=None, action="allow", log_options=None, authentication_options=None, match_vpn_options=None, connection_tracking=None, is_disabled=False, vpn_policy=None, mobile_vpn=False, add_pos=None, after=None, before=None, sub_policy=None, comment=None, validate=True, **kw ): """ Create a layer 3 firewall rule .. versionchanged:: 0.7.0 Action field now requires a list of actions as strings when using API version >= 6.6 Example:: Api version <=6.5 action is a string rule_vpn = p.fw_ipv4_access_rules.create( name="newrule_vpn", sources=[Network("London Internal Network")], destinations=[Network("net-172.31.14.0/24")], services="any", action="apply_vpn", vpn_policy=vpn) Api version >=6.6 action is a list vpn_actions = Action() vpn_actions.action = ['allow', 'apply_vpn'] p.fw_ipv4_access_rules.create(name='', sources=[Network("London Internal Network")], destinations=[Network("net-172.31.14.0/24")], services='any', action=vpn_actions, vpn_policy=vpn) :param str name: name of rule :param sources: source/s for rule :type sources: Source, list[str, Element] :param destinations: destination/s for rule :type destinations: Destination, list[str, Element] :param services: service/s for rule :type services: Service, list[str, Element] :param action: allow,continue,discard,refuse,enforce_vpn, apply_vpn,forward_vpn, blacklist, forced_next_hop (default: allow) :type action: Action,str,list[str] :param LogOptions log_options: LogOptions object :param ConnectionTracking connection_tracking: custom connection tracking settings :param AuthenticationOptions authentication_options: options for auth if any :param SourceVpn match_vpn_options: rule matches traffic from specific VPNs :param PolicyVPN,str vpn_policy: policy element or str href; required for enforce_vpn, use_vpn and apply_vpn actions :param bool mobile_vpn: if using a vpn action, you can set mobile_vpn to True and omit the vpn_policy setting if you want this VPN to apply to any mobile VPN based on the policy VPN associated with the engine :param str,Element sub_policy: sub policy required when rule has an action of 'jump'. Can be the FirewallSubPolicy element or href. :param int add_pos: position to insert the rule, starting with position 1. If the position value is greater than the number of rules, the rule is inserted at the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually exclusive with ``after`` and ``before`` params. :param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos`` and ``before`` params. :param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos`` and ``after`` params. :param str comment: optional comment for this rule :param bool validate: validate the inspection policy during rule creation. Default: True :raises MissingRequiredInput: when options are specified the need additional setting, i.e. use_vpn action requires a vpn policy be specified. :raises CreateRuleFailed: rule creation failure :return: the created ipv4 rule :rtype: IPv4Rule """ rule_values = self.update_targets(sources, destinations, services) rule_action = self._get_action(action) if any(vpn in rule_action.action for vpn in ("apply_vpn", "enforce_vpn", "forward_vpn")): if vpn_policy is None and not mobile_vpn: raise MissingRequiredInput( "You must either specify a vpn_policy or set " "mobile_vpn when using a rule with a VPN action" ) if mobile_vpn: rule_action.mobile_vpn = True else: try: vpn = element_resolver(vpn_policy) # VPNPolicy rule_action.vpn = vpn except ElementNotFound: raise MissingRequiredInput( "Cannot find VPN policy specified: {}, ".format(vpn_policy) ) elif "jump" in rule_action.action: try: rule_action.sub_policy = element_resolver(sub_policy) except ElementNotFound: raise MissingRequiredInput( "Cannot find sub policy specified: {} ".format(sub_policy) ) log_options = LogOptions() if not log_options else log_options if connection_tracking is not None: rule_action.connection_tracking_options.update(**connection_tracking) auth_options = ( AuthenticationOptions() if not authentication_options else authentication_options ) match_vpn_data = None if not match_vpn_options else match_vpn_options.data rule_values.update( name=name, comment=comment, action=rule_action.data, options=log_options.data, authentication_options=auth_options.data, match_vpn_options=match_vpn_data, is_disabled=is_disabled, **kw ) params = {"validate": False} if not validate else {} href = self.href if add_pos is not None: href = self.add_at_position(add_pos) elif before or after: params.update(**self.add_before_after(before, after)) return ElementCreator( self.__class__, exception=CreateRuleFailed, href=href, params=params, json=rule_values )
[docs] class IPv4Layer2Rule(RuleCommon, Rule, SubElement): """ Create IPv4 rules for Layer 2 Firewalls Example of creating an allow all rule:: policy = Layer2Policy('mylayer2') policy.layer2_ipv4_access_rules.create(name='myrule', sources='any', destinations='any', services='any') """ typeof = "layer2_ipv4_access_rule" _actions = ("allow", "continue", "discard", "refuse", "jump", "blacklist", "block_list")
[docs] def create( self, name, sources=None, destinations=None, services=None, action="allow", is_disabled=False, logical_interfaces=None, add_pos=None, after=None, before=None, comment=None, validate=True, sub_policy=None, **kw ): """ Create an IPv4 Layer 2 Engine rule .. versionchanged:: 0.7.0 Action field now requires a list of actions as strings when using SMC version >= 6.6.0 :param str name: name of rule :param sources: source/s for rule :type sources: list[str, Element] :param destinations: destination/s for rule :type destinations: list[str, Element] :param services: service/s for rule :type services: list[str, Element] :param str, Action action: \\|allow\\|continue\\|discard\\|refuse\\|blacklist :param bool is_disabled: whether to disable rule or not :param list logical_interfaces: logical interfaces by name :param int add_pos: position to insert the rule, starting with position 1. If the position value is greater than the number of rules, the rule is inserted at the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually exclusive with ``after`` and ``before`` params. :param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos`` and ``before`` params. :param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos`` and ``after`` params. :param str comment: optional comment for this rule :param bool validate: validate the inspection policy during rule creation. Default: True :param str,Element sub_policy: sub policy required when rule has an action of 'jump'. Can be the IPSSubPolicy element or href. :raises MissingRequiredInput: when options are specified the need additional setting, i.e. use_vpn action requires a vpn policy be specified. :raises CreateRuleFailed: rule creation failure :return: newly created rule :rtype: IPv4Layer2Rule """ rule_values = self.update_targets(sources, destinations, services) rule_action = self._get_action(action) if "jump" in rule_action.action: try: rule_action.sub_policy = element_resolver(sub_policy) except ElementNotFound: raise MissingRequiredInput( "Cannot find sub policy specified: {} ".format(sub_policy) ) rule_values.update( self.update_logical_if(logical_interfaces), name=name, comment=comment, action=rule_action.data, is_disabled=is_disabled, **kw ) params = {"validate": False} if not validate else {} href = self.href if add_pos is not None: href = self.add_at_position(add_pos) elif before or after: params.update(**self.add_before_after(before, after)) return ElementCreator( self.__class__, exception=CreateRuleFailed, href=href, params=params, json=rule_values )
class IPSRule(IPv4Layer2Rule): """ Create IPS Rule Example of creating an allow all rule:: ips_policy = IPSPolicy("myIPSPolicy1") rule1 = ips_policy.ips_ipv4_access_rules.create( name="ips_jump_rule", sources="any", destinations="any", services=[TCPService("SSH")], action="allow" ) """ typeof = "ips_ipv4_access_rules"
[docs] class EthernetRule(RuleCommon, Rule, SubElement): """ Ethernet Rule represents a policy on a layer 2 or IPS engine. If logical_interfaces parameter is left blank, 'any' logical interface is used. Create an ethernet rule for a layer 2 policy:: policy = Layer2Policy('layer2policy') policy.layer2_ethernet_rules.create(name='l2rule', logical_interfaces=['dmz'], sources='any', action='discard') """ typeof = "ethernet_rule" _actions = ("allow", "discard")
[docs] def create( self, name, sources=None, destinations=None, services=None, action="allow", is_disabled=False, logical_interfaces=None, add_pos=None, after=None, before=None, comment=None, validate=True, **kw ): """ Create an Ethernet rule .. versionchanged:: 0.7.0 Action field now requires a list of actions as strings when using SMC version >= 6.6.0 :param str name: name of rule :param sources: source/s for rule :type sources: list[str, Element] :param destinations: destination/s for rule :type destinations: list[str, Element] :param services: service/s for rule :type services: list[str, Element] :param str action: \\|allow\\|continue\\|discard\\|refuse\\|blacklist :param bool is_disabled: whether to disable rule or not :param list logical_interfaces: logical interfaces by name :param int add_pos: position to insert the rule, starting with position 1. If the position value is greater than the number of rules, the rule is inserted at the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually exclusive with ``after`` and ``before`` params. :param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos`` and ``before`` params. :param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos`` and ``after`` params. :param bool validate: validate the inspection policy during rule creation. Default: True :raises MissingRequiredInput: when options are specified the need additional setting, i.e. use_vpn action requires a vpn policy be specified. :raises CreateRuleFailed: rule creation failure :return: newly created rule :rtype: EthernetRule """ rule_values = self.update_targets(sources, destinations, services) rule_action = self._get_action(action) rule_values.update( self.update_logical_if(logical_interfaces), name=name, comment=comment, action=rule_action.data, is_disabled=is_disabled, **kw ) params = {"validate": False} if not validate else {} href = self.href if add_pos is not None: href = self.add_at_position(add_pos) elif before or after: params.update(**self.add_before_after(before, after)) return ElementCreator( self.__class__, exception=CreateRuleFailed, href=href, params=params, json=rule_values )
[docs] class IPv6Rule(IPv4Rule): """ IPv6 access rule defines sources and destinations that must be in IPv6 format. .. note:: It is possible to submit a source or destination in IPv4 format, however this will fail validation when attempting to push policy. """ typeof = "fw_ipv6_access_rule"
class IPv6Layer2Rule(IPv4Layer2Rule): """ IPv6 access rule defines sources and destinations that must be in IPv6 format. .. note:: It is possible to submit a source or destination in IPv4 format, however this will fail validation when attempting to push policy. """ typeof = "layer2_ipv6_access_rule"