#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
import croniter
import eventlet
import json
import netaddr
import zoneinfo
from neutron_lib.api import validators
from oslo_utils import timeutils
from heat.common.i18n import _
from heat.common import netutils as heat_netutils
from heat.engine import constraints
[docs]
class TestConstraintDelay(constraints.BaseCustomConstraint):
[docs]
    def validate_with_client(self, client, value):
        eventlet.sleep(value) 
 
[docs]
class IPConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        self._error_message = 'Invalid IP address'
        if not isinstance(value, str):
            return False
        msg = validators.validate_ip_address(value)
        if msg is not None:
            return False
        return True 
 
[docs]
class MACConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        self._error_message = 'Invalid MAC address.'
        return netaddr.valid_mac(value) 
 
[docs]
class DNSNameConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context):
        try:
            heat_netutils.validate_dns_format(value)
        except ValueError as ex:
            self._error_message = ("'%(value)s' not in valid format."
                                   " Reason: %(reason)s") % {
                                       'value': value,
                                       'reason': str(ex)}
            return False
        return True 
 
[docs]
class RelativeDNSNameConstraint(DNSNameConstraint):
[docs]
    def validate(self, value, context):
        if not value:
            return True
        if value.endswith('.'):
            self._error_message = _("'%s' is a FQDN. It should be a "
                                    "relative domain name.") % value
            return False
        length = len(value)
        if length > heat_netutils.FQDN_MAX_LEN - 3:
            self._error_message = _("'%(value)s' contains '%(length)s' "
                                    "characters. Adding a domain name will "
                                    "cause it to exceed the maximum length "
                                    "of a FQDN of '%(max_len)s'.") % {
                                        "value": value,
                                        "length": length,
                                        "max_len": heat_netutils.FQDN_MAX_LEN}
            return False
        return super(RelativeDNSNameConstraint, self).validate(value, context) 
 
[docs]
class DNSDomainConstraint(DNSNameConstraint):
[docs]
    def validate(self, value, context):
        if not value:
            return True
        if not super(DNSDomainConstraint, self).validate(value, context):
            return False
        if not value.endswith('.'):
            self._error_message = ("'%s' must end with '.'.") % value
            return False
        return True 
 
[docs]
class CIDRConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        try:
            netaddr.IPNetwork(netaddr.cidr_abbrev_to_verbose(value))
            msg = validators.validate_subnet(value)
            if msg is not None:
                self._error_message = msg
                return False
            return True
        except Exception as ex:
            self._error_message = 'Invalid net cidr %s ' % str(ex)
            return False 
 
[docs]
class IPCIDRConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        try:
            if '/' in value:
                msg = validators.validate_subnet(value)
            else:
                msg = validators.validate_ip_address(value)
            if msg is not None:
                self._error_message = msg
                return False
            else:
                return True
        except Exception:
            self._error_message = '{} is not a valid IP or CIDR'.format(
                value)
            return False 
 
[docs]
class ISO8601Constraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        try:
            timeutils.parse_isotime(value)
        except Exception:
            return False
        else:
            return True 
 
[docs]
class CRONExpressionConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        if not value:
            return True
        try:
            croniter.croniter(value)
            return True
        except Exception as ex:
            self._error_message = _(
                'Invalid CRON expression: %s') % str(ex)
        return False 
 
[docs]
class TimezoneConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context, template=None):
        if not value:
            return True
        try:
            zoneinfo.ZoneInfo(value)
            return True
        except Exception as ex:
            self._error_message = _(
                'Invalid timezone: %s') % str(ex)
        return False 
 
[docs]
class ExpirationConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context):
        if not value:
            return True
        try:
            expiration_tz = timeutils.parse_isotime(value.strip())
            expiration = timeutils.normalize_time(expiration_tz)
            if expiration > timeutils.utcnow():
                return True
            raise ValueError(_('Expiration time is out of date.'))
        except Exception as ex:
            self._error_message = (_(
                'Expiration {0} is invalid: {1}').format(value, str(ex)))
        return False 
 
[docs]
class JsonStringConstraint(constraints.BaseCustomConstraint):
[docs]
    def validate(self, value, context):
        if not value:
            return True
        try:
            json.loads(value)
            return True
        except json.decoder.JSONDecodeError as ex:
            self._error_message = (_(
                'JSON string {0} is invalid: {1}').format(value, str(ex)))
        return False