# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
"""This module provides support for a stateful style of testing, where tests
attempt to find a sequence of operations that cause a breakage rather than just
a single value.
Notably, the set of steps available at any point may depend on the
execution to date.
"""
import inspect
from copy import copy
from functools import lru_cache
from io import StringIO
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
Union,
overload,
)
from unittest import TestCase
import attr
from hypothesis import strategies as st
from hypothesis._settings import (
HealthCheck,
Verbosity,
note_deprecation,
settings as Settings,
)
from hypothesis.control import _current_build_context, current_build_context
from hypothesis.core import TestFunc, given
from hypothesis.errors import InvalidArgument, InvalidDefinition
from hypothesis.internal.conjecture import utils as cu
from hypothesis.internal.healthcheck import fail_health_check
from hypothesis.internal.reflection import (
function_digest,
get_pretty_function_description,
nicerepr,
proxies,
)
from hypothesis.internal.validation import check_type
from hypothesis.reporting import current_verbosity, report
from hypothesis.strategies._internal.featureflags import FeatureStrategy
from hypothesis.strategies._internal.strategies import (
Ex,
Ex_Inv,
OneOfStrategy,
SearchStrategy,
check_strategy,
)
from hypothesis.vendor.pretty import RepresentationPrinter
STATE_MACHINE_RUN_LABEL = cu.calc_label_from_name("another state machine step")
SHOULD_CONTINUE_LABEL = cu.calc_label_from_name("should we continue drawing")
class _OmittedArgument:
"""Sentinel class to prevent overlapping overloads in type hints. See comments
above the overloads of @rule."""
class TestCaseProperty: # pragma: no cover
def __get__(self, obj, typ=None):
if obj is not None:
typ = type(obj)
return typ._to_test_case()
def __set__(self, obj, value):
raise AttributeError("Cannot set TestCase")
def __delete__(self, obj):
raise AttributeError("Cannot delete TestCase")
def run_state_machine_as_test(state_machine_factory, *, settings=None):
"""Run a state machine definition as a test, either silently doing nothing
or printing a minimal breaking program and raising an exception.
state_machine_factory is anything which returns an instance of
RuleBasedStateMachine when called with no arguments - it can be a class or a
function. settings will be used to control the execution of the test.
"""
if settings is None:
try:
settings = state_machine_factory.TestCase.settings
check_type(Settings, settings, "state_machine_factory.TestCase.settings")
except AttributeError:
settings = Settings(deadline=None, suppress_health_check=HealthCheck.all())
check_type(Settings, settings, "settings")
@settings
@given(st.data())
def run_state_machine(factory, data):
cd = data.conjecture_data
machine = factory()
check_type(RuleBasedStateMachine, machine, "state_machine_factory()")
cd.hypothesis_runner = machine
print_steps = (
current_build_context().is_final or current_verbosity() >= Verbosity.debug
)
try:
if print_steps:
report(f"state = {machine.__class__.__name__}()")
machine.check_invariants(settings)
max_steps = settings.stateful_step_count
steps_run = 0
while True:
# We basically always want to run the maximum number of steps,
# but need to leave a small probability of terminating early
# in order to allow for reducing the number of steps once we
# find a failing test case, so we stop with probability of
# 2 ** -16 during normal operation but force a stop when we've
# generated enough steps.
cd.start_example(STATE_MACHINE_RUN_LABEL)
if steps_run == 0:
cd.draw_bits(16, forced=1)
elif steps_run >= max_steps:
cd.draw_bits(16, forced=0)
break
else:
# All we really care about is whether this value is zero
# or non-zero, so if it's > 1 we discard it and insert a
# replacement value after
cd.start_example(SHOULD_CONTINUE_LABEL)
should_continue_value = cd.draw_bits(16)
if should_continue_value > 1:
cd.stop_example(discard=True)
cd.draw_bits(16, forced=int(bool(should_continue_value)))
else:
cd.stop_example()
if should_continue_value == 0:
break
steps_run += 1
# Choose a rule to run, preferring an initialize rule if there are
# any which have not been run yet.
if machine._initialize_rules_to_run:
init_rules = [
st.tuples(st.just(rule), st.fixed_dictionaries(rule.arguments))
for rule in machine._initialize_rules_to_run
]
rule, data = cd.draw(st.one_of(init_rules))
machine._initialize_rules_to_run.remove(rule)
else:
rule, data = cd.draw(machine._rules_strategy)
# Pretty-print the values this rule was called with *before* calling
# _add_result_to_targets, to avoid printing arguments which are also
# a return value using the variable name they are assigned to.
# See https://github.com/HypothesisWorks/hypothesis/issues/2341
if print_steps:
data_to_print = {
k: machine._pretty_print(v) for k, v in data.items()
}
# Assign 'result' here in case executing the rule fails below
result = multiple()
try:
data = dict(data)
for k, v in list(data.items()):
if isinstance(v, VarReference):
data[k] = machine.names_to_values[v.name]
result = rule.function(machine, **data)
if rule.targets:
if isinstance(result, MultipleResults):
for single_result in result.values:
machine._add_result_to_targets(
rule.targets, single_result
)
else:
machine._add_result_to_targets(rule.targets, result)
elif result is not None:
fail_health_check(
settings,
"Rules should return None if they have no target bundle, "
f"but {rule.function.__qualname__} returned {result!r}",
HealthCheck.return_value,
)
finally:
if print_steps:
# 'result' is only used if the step has target bundles.
# If it does, and the result is a 'MultipleResult',
# then 'print_step' prints a multi-variable assignment.
machine._print_step(rule, data_to_print, result)
machine.check_invariants(settings)
cd.stop_example()
finally:
if print_steps:
report("state.teardown()")
machine.teardown()
# Use a machine digest to identify stateful tests in the example database
run_state_machine.hypothesis.inner_test._hypothesis_internal_add_digest = (
function_digest(state_machine_factory)
)
# Copy some attributes so @seed and @reproduce_failure "just work"
run_state_machine._hypothesis_internal_use_seed = getattr(
state_machine_factory, "_hypothesis_internal_use_seed", None
)
run_state_machine._hypothesis_internal_use_reproduce_failure = getattr(
state_machine_factory, "_hypothesis_internal_use_reproduce_failure", None
)
run_state_machine._hypothesis_internal_print_given_args = False
run_state_machine(state_machine_factory)
class StateMachineMeta(type):
def __setattr__(cls, name, value):
if name == "settings" and isinstance(value, Settings):
raise AttributeError(
(
"Assigning {cls}.settings = {value} does nothing. Assign "
"to {cls}.TestCase.settings, or use @{value} as a decorator "
"on the {cls} class."
).format(cls=cls.__name__, value=value)
)
return super().__setattr__(name, value)
[docs]class RuleBasedStateMachine(metaclass=StateMachineMeta):
"""A RuleBasedStateMachine gives you a structured way to define state machines.
The idea is that a state machine carries a bunch of types of data
divided into Bundles, and has a set of rules which may read data
from bundles (or just from normal strategies) and push data onto
bundles. At any given point a random applicable rule will be
executed.
"""
_rules_per_class: Dict[type, List[classmethod]] = {}
_invariants_per_class: Dict[type, List[classmethod]] = {}
_initializers_per_class: Dict[type, List[classmethod]] = {}
def __init__(self) -> None:
if not self.rules():
raise InvalidDefinition(f"Type {type(self).__name__} defines no rules")
self.bundles: Dict[str, list] = {}
self.name_counter = 1
self.names_to_values: Dict[str, Any] = {}
self.__stream = StringIO()
self.__printer = RepresentationPrinter(
self.__stream, context=_current_build_context.value
)
self._initialize_rules_to_run = copy(self.initialize_rules())
self._rules_strategy = RuleStrategy(self)
def _pretty_print(self, value):
if isinstance(value, VarReference):
return value.name
self.__stream.seek(0)
self.__stream.truncate(0)
self.__printer.output_width = 0
self.__printer.buffer_width = 0
self.__printer.buffer.clear()
self.__printer.pretty(value)
self.__printer.flush()
return self.__stream.getvalue()
def __repr__(self):
return f"{type(self).__name__}({nicerepr(self.bundles)})"
def _new_name(self):
result = f"v{self.name_counter}"
self.name_counter += 1
return result
def _last_names(self, n):
assert self.name_counter > n
count = self.name_counter
return [f"v{i}" for i in range(count - n, count)]
def bundle(self, name):
return self.bundles.setdefault(name, [])
@classmethod
def initialize_rules(cls):
try:
return cls._initializers_per_class[cls]
except KeyError:
pass
cls._initializers_per_class[cls] = []
for _, v in inspect.getmembers(cls):
r = getattr(v, INITIALIZE_RULE_MARKER, None)
if r is not None:
cls._initializers_per_class[cls].append(r)
return cls._initializers_per_class[cls]
@classmethod
def rules(cls):
try:
return cls._rules_per_class[cls]
except KeyError:
pass
cls._rules_per_class[cls] = []
for _, v in inspect.getmembers(cls):
r = getattr(v, RULE_MARKER, None)
if r is not None:
cls._rules_per_class[cls].append(r)
return cls._rules_per_class[cls]
@classmethod
def invariants(cls):
try:
return cls._invariants_per_class[cls]
except KeyError:
pass
target = []
for _, v in inspect.getmembers(cls):
i = getattr(v, INVARIANT_MARKER, None)
if i is not None:
target.append(i)
cls._invariants_per_class[cls] = target
return cls._invariants_per_class[cls]
def _print_step(self, rule, data, result):
self.step_count = getattr(self, "step_count", 0) + 1
output_assignment = ""
if rule.targets:
if isinstance(result, MultipleResults):
if len(result.values) == 1:
output_assignment = f"({self._last_names(1)[0]},) = "
elif result.values:
output_names = self._last_names(len(result.values))
output_assignment = ", ".join(output_names) + " = "
else:
output_assignment = self._last_names(1)[0] + " = "
report(
"{}state.{}({})".format(
output_assignment,
rule.function.__name__,
", ".join("%s=%s" % kv for kv in data.items()),
)
)
def _add_result_to_targets(self, targets, result):
name = self._new_name()
self.__printer.singleton_pprinters.setdefault(
id(result), lambda obj, p, cycle: p.text(name)
)
self.names_to_values[name] = result
for target in targets:
self.bundles.setdefault(target, []).append(VarReference(name))
def check_invariants(self, settings):
for invar in self.invariants():
if self._initialize_rules_to_run and not invar.check_during_init:
continue
if not all(precond(self) for precond in invar.preconditions):
continue
if (
current_build_context().is_final
or settings.verbosity >= Verbosity.debug
):
report(f"state.{invar.function.__name__}()")
result = invar.function(self)
if result is not None:
fail_health_check(
settings,
"The return value of an @invariant is always ignored, but "
f"{invar.function.__qualname__} returned {result!r} "
"instead of None",
HealthCheck.return_value,
)
def teardown(self):
"""Called after a run has finished executing to clean up any necessary
state.
Does nothing by default.
"""
TestCase = TestCaseProperty()
@classmethod
@lru_cache()
def _to_test_case(cls):
class StateMachineTestCase(TestCase):
settings = Settings(deadline=None, suppress_health_check=HealthCheck.all())
def runTest(self):
run_state_machine_as_test(cls)
runTest.is_hypothesis_test = True
StateMachineTestCase.__name__ = cls.__name__ + ".TestCase"
StateMachineTestCase.__qualname__ = cls.__qualname__ + ".TestCase"
return StateMachineTestCase
@attr.s()
class Rule:
targets = attr.ib()
function = attr.ib(repr=get_pretty_function_description)
arguments = attr.ib()
preconditions = attr.ib()
bundles = attr.ib(init=False)
def __attrs_post_init__(self):
arguments = {}
bundles = []
for k, v in sorted(self.arguments.items()):
assert not isinstance(v, BundleReferenceStrategy)
if isinstance(v, Bundle):
bundles.append(v)
consume = isinstance(v, BundleConsumer)
arguments[k] = BundleReferenceStrategy(v.name, consume)
else:
arguments[k] = v
self.bundles = tuple(bundles)
self.arguments_strategy = st.fixed_dictionaries(arguments)
self_strategy = st.runner()
class BundleReferenceStrategy(SearchStrategy):
def __init__(self, name, consume=False):
self.name = name
self.consume = consume
def do_draw(self, data):
machine = data.draw(self_strategy)
bundle = machine.bundle(self.name)
if not bundle:
data.mark_invalid()
# Shrink towards the right rather than the left. This makes it easier
# to delete data generated earlier, as when the error is towards the
# end there can be a lot of hard to remove padding.
position = cu.integer_range(data, 0, len(bundle) - 1, center=len(bundle))
if self.consume:
return bundle.pop(position)
else:
return bundle[position]
class Bundle(SearchStrategy[Ex]):
def __init__(self, name: str, consume: bool = False) -> None:
self.name = name
self.__reference_strategy = BundleReferenceStrategy(name, consume)
def do_draw(self, data):
machine = data.draw(self_strategy)
reference = data.draw(self.__reference_strategy)
return machine.names_to_values[reference.name]
def __repr__(self):
consume = self.__reference_strategy.consume
if consume is False:
return f"Bundle(name={self.name!r})"
return f"Bundle(name={self.name!r}, consume={consume!r})"
def calc_is_empty(self, recur):
# We assume that a bundle will grow over time
return False
def available(self, data):
# ``self_strategy`` is an instance of the ``st.runner()`` strategy.
# Hence drawing from it only returns the current state machine without
# modifying the underlying buffer.
machine = data.draw(self_strategy)
return bool(machine.bundle(self.name))
class BundleConsumer(Bundle[Ex]):
def __init__(self, bundle: Bundle[Ex]) -> None:
super().__init__(bundle.name, consume=True)
[docs]def consumes(bundle: Bundle[Ex]) -> SearchStrategy[Ex]:
"""When introducing a rule in a RuleBasedStateMachine, this function can
be used to mark bundles from which each value used in a step with the
given rule should be removed. This function returns a strategy object
that can be manipulated and combined like any other.
For example, a rule declared with
``@rule(value1=b1, value2=consumes(b2), value3=lists(consumes(b3)))``
will consume a value from Bundle ``b2`` and several values from Bundle
``b3`` to populate ``value2`` and ``value3`` each time it is executed.
"""
if not isinstance(bundle, Bundle):
raise TypeError("Argument to be consumed must be a bundle.")
return BundleConsumer(bundle)
@attr.s()
class MultipleResults(Iterable[Ex]):
values = attr.ib()
def __iter__(self):
return iter(self.values)
# We need to use an invariant typevar here to avoid a mypy error, as covariant
# typevars cannot be used as parameters.
[docs]def multiple(*args: Ex_Inv) -> MultipleResults[Ex_Inv]:
"""This function can be used to pass multiple results to the target(s) of
a rule. Just use ``return multiple(result1, result2, ...)`` in your rule.
It is also possible to use ``return multiple()`` with no arguments in
order to end a rule without passing any result.
"""
return MultipleResults(args)
def _convert_targets(targets, target):
"""Single validator and converter for target arguments."""
if target is not None:
if targets:
raise InvalidArgument(
"Passing both targets=%r and target=%r is redundant - pass "
"targets=%r instead." % (targets, target, tuple(targets) + (target,))
)
targets = (target,)
converted_targets = []
for t in targets:
if not isinstance(t, Bundle):
msg = "Got invalid target %r of type %r, but all targets must be Bundles."
if isinstance(t, OneOfStrategy):
msg += (
"\nIt looks like you passed `one_of(a, b)` or `a | b` as "
"a target. You should instead pass `targets=(a, b)` to "
"add the return value of this rule to both the `a` and "
"`b` bundles, or define a rule for each target if it "
"should be added to exactly one."
)
raise InvalidArgument(msg % (t, type(t)))
while isinstance(t, Bundle):
if isinstance(t, BundleConsumer):
note_deprecation(
f"Using consumes({t.name}) doesn't makes sense in this context. "
"This will be an error in a future version of Hypothesis.",
since="2021-09-08",
has_codemod=False,
)
t = t.name
converted_targets.append(t)
return tuple(converted_targets)
RULE_MARKER = "hypothesis_stateful_rule"
INITIALIZE_RULE_MARKER = "hypothesis_stateful_initialize_rule"
PRECONDITIONS_MARKER = "hypothesis_stateful_preconditions"
INVARIANT_MARKER = "hypothesis_stateful_invariant"
_RuleType = Callable[..., Union[MultipleResults[Ex], Ex]]
_RuleWrapper = Callable[[_RuleType[Ex]], _RuleType[Ex]]
# We cannot exclude `target` or `targets` from any of these signatures because
# otherwise they would be matched against the `kwargs`, either leading to
# overlapping overloads of incompatible return types, or a concrete
# implementation that does not accept all overloaded variant signatures.
# Although it is possible to reorder the variants to fix the former, it will
# always lead to the latter, as then the omitted parameter could be typed as
# a `SearchStrategy`, which the concrete implementation does not accept.
#
# Omitted `targets` parameters, where the default value is used, are typed with
# a special `_OmittedArgument` type. We cannot type them as `Tuple[()]`, because
# `Tuple[()]` is a subtype of `Sequence[Bundle[Ex]]`, leading to signature
# overlaps with incompatible return types. The `_OmittedArgument` type will never be
# encountered at runtime, and exists solely to annotate the default of `targets`.
# PEP 661 (Sentinel Values) might provide a more elegant alternative in the future.
#
# We could've also annotated `targets` as `Tuple[_OmittedArgument]`, but then when
# both `target` and `targets` are provided, mypy describes the type error as an
# invalid argument type for `targets` (expected `Tuple[_OmittedArgument]`, got ...).
# By annotating it as a bare `_OmittedArgument` type, mypy's error will warn that
# there is no overloaded signature matching the call, which is more descriptive.
#
# When `target` xor `targets` is provided, the function to decorate must return
# a value whose type matches the one stored in the bundle. When neither are
# provided, the function to decorate must return nothing. There is no variant
# for providing `target` and `targets`, as these parameters are mutually exclusive.
@overload
def rule(
*,
targets: Sequence[Bundle[Ex]],
target: None = ...,
**kwargs: SearchStrategy,
) -> _RuleWrapper[Ex]: # pragma: no cover
...
@overload
def rule(
*, target: Bundle[Ex], targets: _OmittedArgument = ..., **kwargs: SearchStrategy
) -> _RuleWrapper[Ex]: # pragma: no cover
...
@overload
def rule(
*,
target: None = ...,
targets: _OmittedArgument = ...,
**kwargs: SearchStrategy,
) -> Callable[[Callable[..., None]], Callable[..., None]]: # pragma: no cover
...
[docs]def rule(
*,
targets: Union[Sequence[Bundle[Ex]], _OmittedArgument] = (),
target: Optional[Bundle[Ex]] = None,
**kwargs: SearchStrategy,
) -> Union[_RuleWrapper[Ex], Callable[[Callable[..., None]], Callable[..., None]]]:
"""Decorator for RuleBasedStateMachine. Any Bundle present in ``target`` or
``targets`` will define where the end result of this function should go. If
both are empty then the end result will be discarded.
``target`` must be a Bundle, or if the result should go to multiple
bundles you can pass a tuple of them as the ``targets`` argument.
It is invalid to use both arguments for a single rule. If the result
should go to exactly one of several bundles, define a separate rule for
each case.
kwargs then define the arguments that will be passed to the function
invocation. If their value is a Bundle, or if it is ``consumes(b)``
where ``b`` is a Bundle, then values that have previously been produced
for that bundle will be provided. If ``consumes`` is used, the value
will also be removed from the bundle.
Any other kwargs should be strategies and values from them will be
provided.
"""
converted_targets = _convert_targets(targets, target)
for k, v in kwargs.items():
check_strategy(v, name=k)
def accept(f):
if getattr(f, INVARIANT_MARKER, None):
raise InvalidDefinition(
"A function cannot be used for both a rule and an invariant.",
Settings.default,
)
existing_rule = getattr(f, RULE_MARKER, None)
existing_initialize_rule = getattr(f, INITIALIZE_RULE_MARKER, None)
if existing_rule is not None or existing_initialize_rule is not None:
raise InvalidDefinition(
"A function cannot be used for two distinct rules. ", Settings.default
)
preconditions = getattr(f, PRECONDITIONS_MARKER, ())
rule = Rule(
targets=converted_targets,
arguments=kwargs,
function=f,
preconditions=preconditions,
)
@proxies(f)
def rule_wrapper(*args, **kwargs):
return f(*args, **kwargs)
setattr(rule_wrapper, RULE_MARKER, rule)
return rule_wrapper
return accept
# See also comments of `rule`'s overloads.
@overload
def initialize(
*,
targets: Sequence[Bundle[Ex]],
target: None = ...,
**kwargs: SearchStrategy,
) -> _RuleWrapper[Ex]: # pragma: no cover
...
@overload
def initialize(
*, target: Bundle[Ex], targets: _OmittedArgument = ..., **kwargs: SearchStrategy
) -> _RuleWrapper[Ex]: # pragma: no cover
...
@overload
def initialize(
*,
target: None = ...,
targets: _OmittedArgument = ...,
**kwargs: SearchStrategy,
) -> Callable[[Callable[..., None]], Callable[..., None]]: # pragma: no cover
...
[docs]def initialize(
*,
targets: Union[Sequence[Bundle[Ex]], _OmittedArgument] = (),
target: Optional[Bundle[Ex]] = None,
**kwargs: SearchStrategy,
) -> Union[_RuleWrapper[Ex], Callable[[Callable[..., None]], Callable[..., None]]]:
"""Decorator for RuleBasedStateMachine.
An initialize decorator behaves like a rule, but all ``@initialize()`` decorated
methods will be called before any ``@rule()`` decorated methods, in an arbitrary
order. Each ``@initialize()`` method will be called exactly once per run, unless
one raises an exception - after which only the ``.teardown()`` method will be run.
``@initialize()`` methods may not have preconditions.
"""
converted_targets = _convert_targets(targets, target)
for k, v in kwargs.items():
check_strategy(v, name=k)
def accept(f):
if getattr(f, INVARIANT_MARKER, None):
raise InvalidDefinition(
"A function cannot be used for both a rule and an invariant.",
Settings.default,
)
existing_rule = getattr(f, RULE_MARKER, None)
existing_initialize_rule = getattr(f, INITIALIZE_RULE_MARKER, None)
if existing_rule is not None or existing_initialize_rule is not None:
raise InvalidDefinition(
"A function cannot be used for two distinct rules. ", Settings.default
)
preconditions = getattr(f, PRECONDITIONS_MARKER, ())
if preconditions:
raise InvalidDefinition(
"An initialization rule cannot have a precondition. ", Settings.default
)
rule = Rule(
targets=converted_targets,
arguments=kwargs,
function=f,
preconditions=preconditions,
)
@proxies(f)
def rule_wrapper(*args, **kwargs):
return f(*args, **kwargs)
setattr(rule_wrapper, INITIALIZE_RULE_MARKER, rule)
return rule_wrapper
return accept
@attr.s()
class VarReference:
name = attr.ib()
# There are multiple alternatives for annotating the `precond` type, all of them
# have drawbacks. See https://github.com/HypothesisWorks/hypothesis/pull/3068#issuecomment-906642371
[docs]def precondition(precond: Callable[[Any], bool]) -> Callable[[TestFunc], TestFunc]:
"""Decorator to apply a precondition for rules in a RuleBasedStateMachine.
Specifies a precondition for a rule to be considered as a valid step in the
state machine, which is more efficient than using :func:`~hypothesis.assume`
within the rule. The ``precond`` function will be called with the instance of
RuleBasedStateMachine and should return True or False. Usually it will need
to look at attributes on that instance.
For example::
class MyTestMachine(RuleBasedStateMachine):
state = 1
@precondition(lambda self: self.state != 0)
@rule(numerator=integers())
def divide_with(self, numerator):
self.state = numerator / self.state
If multiple preconditions are applied to a single rule, it is only considered
a valid step when all of them return True. Preconditions may be applied to
invariants as well as rules.
"""
def decorator(f):
@proxies(f)
def precondition_wrapper(*args, **kwargs):
return f(*args, **kwargs)
existing_initialize_rule = getattr(f, INITIALIZE_RULE_MARKER, None)
if existing_initialize_rule is not None:
raise InvalidDefinition(
"An initialization rule cannot have a precondition. ", Settings.default
)
rule = getattr(f, RULE_MARKER, None)
invariant = getattr(f, INVARIANT_MARKER, None)
if rule is not None:
assert invariant is None
new_rule = attr.evolve(rule, preconditions=rule.preconditions + (precond,))
setattr(precondition_wrapper, RULE_MARKER, new_rule)
elif invariant is not None:
assert rule is None
new_invariant = attr.evolve(
invariant, preconditions=invariant.preconditions + (precond,)
)
setattr(precondition_wrapper, INVARIANT_MARKER, new_invariant)
else:
setattr(
precondition_wrapper,
PRECONDITIONS_MARKER,
getattr(f, PRECONDITIONS_MARKER, ()) + (precond,),
)
return precondition_wrapper
return decorator
@attr.s()
class Invariant:
function = attr.ib(repr=get_pretty_function_description)
preconditions = attr.ib()
check_during_init = attr.ib()
[docs]def invariant(*, check_during_init: bool = False) -> Callable[[TestFunc], TestFunc]:
"""Decorator to apply an invariant for rules in a RuleBasedStateMachine.
The decorated function will be run after every rule and can raise an
exception to indicate failed invariants.
For example::
class MyTestMachine(RuleBasedStateMachine):
state = 1
@invariant()
def is_nonzero(self):
assert self.state != 0
By default, invariants are only checked after all
:func:`@initialize() <hypothesis.stateful.initialize>` rules have been run.
Pass ``check_during_init=True`` for invariants which can also be checked
during initialization.
"""
check_type(bool, check_during_init, "check_during_init")
def accept(f):
if getattr(f, RULE_MARKER, None) or getattr(f, INITIALIZE_RULE_MARKER, None):
raise InvalidDefinition(
"A function cannot be used for both a rule and an invariant.",
Settings.default,
)
existing_invariant = getattr(f, INVARIANT_MARKER, None)
if existing_invariant is not None:
raise InvalidDefinition(
"A function cannot be used for two distinct invariants.",
Settings.default,
)
preconditions = getattr(f, PRECONDITIONS_MARKER, ())
invar = Invariant(
function=f,
preconditions=preconditions,
check_during_init=check_during_init,
)
@proxies(f)
def invariant_wrapper(*args, **kwargs):
return f(*args, **kwargs)
setattr(invariant_wrapper, INVARIANT_MARKER, invar)
return invariant_wrapper
return accept
LOOP_LABEL = cu.calc_label_from_name("RuleStrategy loop iteration")
class RuleStrategy(SearchStrategy):
def __init__(self, machine):
super().__init__()
self.machine = machine
self.rules = list(machine.rules())
self.enabled_rules_strategy = st.shared(
FeatureStrategy(), key=("enabled rules", machine)
)
# The order is a bit arbitrary. Primarily we're trying to group rules
# that write to the same location together, and to put rules with no
# target first as they have less effect on the structure. We order from
# fewer to more arguments on grounds that it will plausibly need less
# data. This probably won't work especially well and we could be
# smarter about it, but it's better than just doing it in definition
# order.
self.rules.sort(
key=lambda rule: (
sorted(rule.targets),
len(rule.arguments),
rule.function.__name__,
)
)
def __repr__(self):
return "{}(machine={}({{...}}))".format(
self.__class__.__name__,
self.machine.__class__.__name__,
)
def do_draw(self, data):
if not any(self.is_valid(rule) for rule in self.rules):
msg = f"No progress can be made from state {self.machine!r}"
raise InvalidDefinition(msg) from None
feature_flags = data.draw(self.enabled_rules_strategy)
# Note: The order of the filters here is actually quite important,
# because checking is_enabled makes choices, so increases the size of
# the choice sequence. This means that if we are in a case where many
# rules are invalid we will make a lot more choices if we ask if they
# are enabled before we ask if they are valid, so our test cases will
# be artificially large.
rule = data.draw(
st.sampled_from(self.rules)
.filter(self.is_valid)
.filter(lambda r: feature_flags.is_enabled(r.function.__name__))
)
return (rule, data.draw(rule.arguments_strategy))
def is_valid(self, rule):
if not all(precond(self.machine) for precond in rule.preconditions):
return False
for b in rule.bundles:
bundle = self.machine.bundle(b.name)
if not bundle:
return False
return True