# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
from collections import OrderedDict, abc
from copy import copy
from datetime import datetime, timedelta
from typing import Any, List, Optional, Sequence, Set, Union
import attr
import numpy as np
import pandas
from hypothesis import strategies as st
from hypothesis._settings import note_deprecation
from hypothesis.control import reject
from hypothesis.errors import InvalidArgument
from hypothesis.extra import numpy as npst
from hypothesis.internal.conjecture import utils as cu
from hypothesis.internal.coverage import check, check_function
from hypothesis.internal.reflection import get_pretty_function_description
from hypothesis.internal.validation import (
check_type,
check_valid_interval,
check_valid_size,
try_convert,
)
from hypothesis.strategies._internal.strategies import Ex, check_strategy
from hypothesis.strategies._internal.utils import cacheable, defines_strategy
try:
from pandas.api.types import is_categorical_dtype
except ImportError:
def is_categorical_dtype(dt):
if isinstance(dt, np.dtype):
return False
return dt == "category"
def dtype_for_elements_strategy(s):
return st.shared(
s.map(lambda x: pandas.Series([x]).dtype),
key=("hypothesis.extra.pandas.dtype_for_elements_strategy", s),
)
def infer_dtype_if_necessary(dtype, values, elements, draw):
if dtype is None and not values:
return draw(dtype_for_elements_strategy(elements))
return dtype
@check_function
def elements_and_dtype(elements, dtype, source=None):
if source is None:
prefix = ""
else:
prefix = f"{source}."
if elements is not None:
check_strategy(elements, f"{prefix}elements")
else:
with check("dtype is not None"):
if dtype is None:
raise InvalidArgument(
f"At least one of {prefix}elements or {prefix}dtype must be provided."
)
with check("is_categorical_dtype"):
if is_categorical_dtype(dtype):
raise InvalidArgument(
f"{prefix}dtype is categorical, which is currently unsupported"
)
if isinstance(dtype, type) and np.dtype(dtype).kind == "O" and dtype is not object:
note_deprecation(
f"Passed dtype={dtype!r} is not a valid Pandas dtype. We'll treat it as "
"dtype=object for now, but this will be an error in a future version.",
since="2021-12-31",
has_codemod=False,
)
if isinstance(dtype, st.SearchStrategy):
raise InvalidArgument(
f"Passed dtype={dtype!r} is a strategy, but we require a concrete dtype "
"here. See https://stackoverflow.com/q/74355937 for workaround patterns."
)
dtype = try_convert(np.dtype, dtype, "dtype")
if elements is None:
elements = npst.from_dtype(dtype)
elif dtype is not None:
def convert_element(value):
name = f"draw({prefix}elements)"
try:
return np.array([value], dtype=dtype)[0]
except TypeError:
raise InvalidArgument(
"Cannot convert %s=%r of type %s to dtype %s"
% (name, value, type(value).__name__, dtype.str)
) from None
except ValueError:
raise InvalidArgument(
f"Cannot convert {name}={value!r} to type {dtype.str}"
) from None
elements = elements.map(convert_element)
assert elements is not None
return elements, dtype
class ValueIndexStrategy(st.SearchStrategy):
def __init__(self, elements, dtype, min_size, max_size, unique, name):
super().__init__()
self.elements = elements
self.dtype = dtype
self.min_size = min_size
self.max_size = max_size
self.unique = unique
self.name = name
def do_draw(self, data):
result = []
seen = set()
iterator = cu.many(
data,
min_size=self.min_size,
max_size=self.max_size,
average_size=(self.min_size + self.max_size) / 2,
)
while iterator.more():
elt = data.draw(self.elements)
if self.unique:
if elt in seen:
iterator.reject()
continue
seen.add(elt)
result.append(elt)
dtype = infer_dtype_if_necessary(
dtype=self.dtype, values=result, elements=self.elements, draw=data.draw
)
return pandas.Index(
result, dtype=dtype, tupleize_cols=False, name=data.draw(self.name)
)
DEFAULT_MAX_SIZE = 10
[docs]@cacheable
@defines_strategy()
def range_indexes(
min_size: int = 0,
max_size: Optional[int] = None,
name: st.SearchStrategy[Optional[str]] = st.none(),
) -> st.SearchStrategy[pandas.RangeIndex]:
"""Provides a strategy which generates an :class:`~pandas.Index` whose
values are 0, 1, ..., n for some n.
Arguments:
* min_size is the smallest number of elements the index can have.
* max_size is the largest number of elements the index can have. If None
it will default to some suitable value based on min_size.
* name is the name of the index. If st.none(), the index will have no name.
"""
check_valid_size(min_size, "min_size")
check_valid_size(max_size, "max_size")
if max_size is None:
max_size = min([min_size + DEFAULT_MAX_SIZE, 2**63 - 1])
check_valid_interval(min_size, max_size, "min_size", "max_size")
check_strategy(name)
return st.builds(pandas.RangeIndex, st.integers(min_size, max_size), name=name)
[docs]@cacheable
@defines_strategy()
def indexes(
*,
elements: Optional[st.SearchStrategy[Ex]] = None,
dtype: Any = None,
min_size: int = 0,
max_size: Optional[int] = None,
unique: bool = True,
name: st.SearchStrategy[Optional[str]] = st.none(),
) -> st.SearchStrategy[pandas.Index]:
"""Provides a strategy for producing a :class:`pandas.Index`.
Arguments:
* elements is a strategy which will be used to generate the individual
values of the index. If None, it will be inferred from the dtype. Note:
even if the elements strategy produces tuples, the generated value
will not be a MultiIndex, but instead be a normal index whose elements
are tuples.
* dtype is the dtype of the resulting index. If None, it will be inferred
from the elements strategy. At least one of dtype or elements must be
provided.
* min_size is the minimum number of elements in the index.
* max_size is the maximum number of elements in the index. If None then it
will default to a suitable small size. If you want larger indexes you
should pass a max_size explicitly.
* unique specifies whether all of the elements in the resulting index
should be distinct.
* name is a strategy for strings or ``None``, which will be passed to
the :class:`pandas.Index` constructor.
"""
check_valid_size(min_size, "min_size")
check_valid_size(max_size, "max_size")
check_valid_interval(min_size, max_size, "min_size", "max_size")
check_type(bool, unique, "unique")
elements, dtype = elements_and_dtype(elements, dtype)
if max_size is None:
max_size = min_size + DEFAULT_MAX_SIZE
return ValueIndexStrategy(elements, dtype, min_size, max_size, unique, name)
[docs]@defines_strategy()
def series(
*,
elements: Optional[st.SearchStrategy[Ex]] = None,
dtype: Any = None,
index: Optional[st.SearchStrategy[Union[Sequence, pandas.Index]]] = None,
fill: Optional[st.SearchStrategy[Ex]] = None,
unique: bool = False,
name: st.SearchStrategy[Optional[str]] = st.none(),
) -> st.SearchStrategy[pandas.Series]:
"""Provides a strategy for producing a :class:`pandas.Series`.
Arguments:
* elements: a strategy that will be used to generate the individual
values in the series. If None, we will attempt to infer a suitable
default from the dtype.
* dtype: the dtype of the resulting series and may be any value
that can be passed to :class:`numpy.dtype`. If None, will use
pandas's standard behaviour to infer it from the type of the elements
values. Note that if the type of values that comes out of your
elements strategy varies, then so will the resulting dtype of the
series.
* index: If not None, a strategy for generating indexes for the
resulting Series. This can generate either :class:`pandas.Index`
objects or any sequence of values (which will be passed to the
Index constructor).
You will probably find it most convenient to use the
:func:`~hypothesis.extra.pandas.indexes` or
:func:`~hypothesis.extra.pandas.range_indexes` function to produce
values for this argument.
* name: is a strategy for strings or ``None``, which will be passed to
the :class:`pandas.Series` constructor.
Usage:
.. code-block:: pycon
>>> series(dtype=int).example()
0 -2001747478
1 1153062837
"""
if index is None:
index = range_indexes()
else:
check_strategy(index, "index")
elements, dtype = elements_and_dtype(elements, dtype)
index_strategy = index
@st.composite
def result(draw):
index = draw(index_strategy)
if len(index) > 0:
if dtype is not None:
result_data = draw(
npst.arrays(
dtype=dtype,
elements=elements,
shape=len(index),
fill=fill,
unique=unique,
)
)
else:
result_data = list(
draw(
npst.arrays(
dtype=object,
elements=elements,
shape=len(index),
fill=fill,
unique=unique,
)
)
)
return pandas.Series(result_data, index=index, dtype=dtype, name=draw(name))
else:
return pandas.Series(
(),
index=index,
dtype=dtype
if dtype is not None
else draw(dtype_for_elements_strategy(elements)),
name=draw(name),
)
return result()
[docs]@attr.s(slots=True)
class column:
"""Data object for describing a column in a DataFrame.
Arguments:
* name: the column name, or None to default to the column position. Must
be hashable, but can otherwise be any value supported as a pandas column
name.
* elements: the strategy for generating values in this column, or None
to infer it from the dtype.
* dtype: the dtype of the column, or None to infer it from the element
strategy. At least one of dtype or elements must be provided.
* fill: A default value for elements of the column. See
:func:`~hypothesis.extra.numpy.arrays` for a full explanation.
* unique: If all values in this column should be distinct.
"""
name = attr.ib(default=None)
elements = attr.ib(default=None)
dtype = attr.ib(default=None, repr=get_pretty_function_description)
fill = attr.ib(default=None)
unique = attr.ib(default=False)
[docs]def columns(
names_or_number: Union[int, Sequence[str]],
*,
dtype: Any = None,
elements: Optional[st.SearchStrategy[Ex]] = None,
fill: Optional[st.SearchStrategy[Ex]] = None,
unique: bool = False,
) -> List[column]:
"""A convenience function for producing a list of :class:`column` objects
of the same general shape.
The names_or_number argument is either a sequence of values, the
elements of which will be used as the name for individual column
objects, or a number, in which case that many unnamed columns will
be created. All other arguments are passed through verbatim to
create the columns.
"""
if isinstance(names_or_number, (int, float)):
names: List[Union[int, str, None]] = [None] * names_or_number
else:
names = list(names_or_number)
return [
column(name=n, dtype=dtype, elements=elements, fill=fill, unique=unique)
for n in names
]
[docs]@defines_strategy()
def data_frames(
columns: Optional[Sequence[column]] = None,
*,
rows: Optional[st.SearchStrategy[Union[dict, Sequence[Any]]]] = None,
index: Optional[st.SearchStrategy[Ex]] = None,
) -> st.SearchStrategy[pandas.DataFrame]:
"""Provides a strategy for producing a :class:`pandas.DataFrame`.
Arguments:
* columns: An iterable of :class:`column` objects describing the shape
of the generated DataFrame.
* rows: A strategy for generating a row object. Should generate
either dicts mapping column names to values or a sequence mapping
column position to the value in that position (note that unlike the
:class:`pandas.DataFrame` constructor, single values are not allowed
here. Passing e.g. an integer is an error, even if there is only one
column).
At least one of rows and columns must be provided. If both are
provided then the generated rows will be validated against the
columns and an error will be raised if they don't match.
Caveats on using rows:
* In general you should prefer using columns to rows, and only use
rows if the columns interface is insufficiently flexible to
describe what you need - you will get better performance and
example quality that way.
* If you provide rows and not columns, then the shape and dtype of
the resulting DataFrame may vary. e.g. if you have a mix of int
and float in the values for one column in your row entries, the
column will sometimes have an integral dtype and sometimes a float.
* index: If not None, a strategy for generating indexes for the
resulting DataFrame. This can generate either :class:`pandas.Index`
objects or any sequence of values (which will be passed to the
Index constructor).
You will probably find it most convenient to use the
:func:`~hypothesis.extra.pandas.indexes` or
:func:`~hypothesis.extra.pandas.range_indexes` function to produce
values for this argument.
Usage:
The expected usage pattern is that you use :class:`column` and
:func:`columns` to specify a fixed shape of the DataFrame you want as
follows. For example the following gives a two column data frame:
.. code-block:: pycon
>>> from hypothesis.extra.pandas import column, data_frames
>>> data_frames([
... column('A', dtype=int), column('B', dtype=float)]).example()
A B
0 2021915903 1.793898e+232
1 1146643993 inf
2 -2096165693 1.000000e+07
If you want the values in different columns to interact in some way you
can use the rows argument. For example the following gives a two column
DataFrame where the value in the first column is always at most the value
in the second:
.. code-block:: pycon
>>> from hypothesis.extra.pandas import column, data_frames
>>> import hypothesis.strategies as st
>>> data_frames(
... rows=st.tuples(st.floats(allow_nan=False),
... st.floats(allow_nan=False)).map(sorted)
... ).example()
0 1
0 -3.402823e+38 9.007199e+15
1 -1.562796e-298 5.000000e-01
You can also combine the two:
.. code-block:: pycon
>>> from hypothesis.extra.pandas import columns, data_frames
>>> import hypothesis.strategies as st
>>> data_frames(
... columns=columns(["lo", "hi"], dtype=float),
... rows=st.tuples(st.floats(allow_nan=False),
... st.floats(allow_nan=False)).map(sorted)
... ).example()
lo hi
0 9.314723e-49 4.353037e+45
1 -9.999900e-01 1.000000e+07
2 -2.152861e+134 -1.069317e-73
(Note that the column dtype must still be specified and will not be
inferred from the rows. This restriction may be lifted in future).
Combining rows and columns has the following behaviour:
* The column names and dtypes will be used.
* If the column is required to be unique, this will be enforced.
* Any values missing from the generated rows will be provided using the
column's fill.
* Any values in the row not present in the column specification (if
dicts are passed, if there are keys with no corresponding column name,
if sequences are passed if there are too many items) will result in
InvalidArgument being raised.
"""
if index is None:
index = range_indexes()
else:
check_strategy(index, "index")
index_strategy = index
if columns is None:
if rows is None:
raise InvalidArgument("At least one of rows and columns must be provided")
else:
@st.composite
def rows_only(draw):
index = draw(index_strategy)
@check_function
def row():
result = draw(rows)
check_type(abc.Iterable, result, "draw(row)")
return result
if len(index) > 0:
return pandas.DataFrame([row() for _ in index], index=index)
else:
# If we haven't drawn any rows we need to draw one row and
# then discard it so that we get a consistent shape for the
# DataFrame.
base = pandas.DataFrame([row()])
return base.drop(0)
return rows_only()
assert columns is not None
cols = try_convert(tuple, columns, "columns")
rewritten_columns = []
column_names: Set[str] = set()
for i, c in enumerate(cols):
check_type(column, c, f"columns[{i}]")
c = copy(c)
if c.name is None:
label = f"columns[{i}]"
c.name = i
else:
label = c.name
try:
hash(c.name)
except TypeError:
raise InvalidArgument(
f"Column names must be hashable, but columns[{i}].name was "
f"{c.name!r} of type {type(c.name).__name__}, which cannot be hashed."
) from None
if c.name in column_names:
raise InvalidArgument(f"duplicate definition of column name {c.name!r}")
column_names.add(c.name)
c.elements, c.dtype = elements_and_dtype(c.elements, c.dtype, label)
if c.dtype is None and rows is not None:
raise InvalidArgument(
"Must specify a dtype for all columns when combining rows with columns."
)
c.fill = npst.fill_for(
fill=c.fill, elements=c.elements, unique=c.unique, name=label
)
rewritten_columns.append(c)
if rows is None:
@st.composite
def just_draw_columns(draw):
index = draw(index_strategy)
local_index_strategy = st.just(index)
data = OrderedDict((c.name, None) for c in rewritten_columns)
# Depending on how the columns are going to be generated we group
# them differently to get better shrinking. For columns with fill
# enabled, the elements can be shrunk independently of the size,
# so we can just shrink by shrinking the index then shrinking the
# length and are generally much more free to move data around.
# For columns with no filling the problem is harder, and drawing
# them like that would result in rows being very far apart from
# each other in the underlying data stream, which gets in the way
# of shrinking. So what we do is reorder and draw those columns
# row wise, so that the values of each row are next to each other.
# This makes life easier for the shrinker when deleting blocks of
# data.
columns_without_fill = [c for c in rewritten_columns if c.fill.is_empty]
if columns_without_fill:
for c in columns_without_fill:
data[c.name] = pandas.Series(
np.zeros(shape=len(index), dtype=c.dtype), index=index
)
seen = {c.name: set() for c in columns_without_fill if c.unique}
for i in range(len(index)):
for c in columns_without_fill:
if c.unique:
for _ in range(5):
value = draw(c.elements)
if value not in seen[c.name]:
seen[c.name].add(value)
break
else:
reject()
else:
value = draw(c.elements)
try:
data[c.name][i] = value
except ValueError as err: # pragma: no cover
# This just works in Pandas 1.4 and later, but gives
# a confusing error on previous versions.
if c.dtype is None and not isinstance(
value, (float, int, str, bool, datetime, timedelta)
):
raise ValueError(
f"Failed to add value={value!r} to column "
f"{c.name} with dtype=None. Maybe passing "
"dtype=object would help?"
) from err
# Unclear how this could happen, but users find a way...
raise
for c in rewritten_columns:
if not c.fill.is_empty:
data[c.name] = draw(
series(
index=local_index_strategy,
dtype=c.dtype,
elements=c.elements,
fill=c.fill,
unique=c.unique,
)
)
return pandas.DataFrame(data, index=index)
return just_draw_columns()
else:
@st.composite
def assign_rows(draw):
index = draw(index_strategy)
result = pandas.DataFrame(
OrderedDict(
(
c.name,
pandas.Series(
np.zeros(dtype=c.dtype, shape=len(index)), dtype=c.dtype
),
)
for c in rewritten_columns
),
index=index,
)
fills = {}
any_unique = any(c.unique for c in rewritten_columns)
if any_unique:
all_seen = [set() if c.unique else None for c in rewritten_columns]
while all_seen[-1] is None:
all_seen.pop()
for row_index in range(len(index)):
for _ in range(5):
original_row = draw(rows)
row = original_row
if isinstance(row, dict):
as_list = [None] * len(rewritten_columns)
for i, c in enumerate(rewritten_columns):
try:
as_list[i] = row[c.name]
except KeyError:
try:
as_list[i] = fills[i]
except KeyError:
if c.fill.is_empty:
raise InvalidArgument(
f"Empty fill strategy in {c!r} cannot "
f"complete row {original_row!r}"
) from None
fills[i] = draw(c.fill)
as_list[i] = fills[i]
for k in row:
if k not in column_names:
raise InvalidArgument(
"Row %r contains column %r not in columns %r)"
% (row, k, [c.name for c in rewritten_columns])
)
row = as_list
if any_unique:
has_duplicate = False
for seen, value in zip(all_seen, row):
if seen is None:
continue
if value in seen:
has_duplicate = True
break
seen.add(value)
if has_duplicate:
continue
row = list(try_convert(tuple, row, "draw(rows)"))
if len(row) > len(rewritten_columns):
raise InvalidArgument(
f"Row {original_row!r} contains too many entries. Has "
f"{len(row)} but expected at most {len(rewritten_columns)}"
)
while len(row) < len(rewritten_columns):
c = rewritten_columns[len(row)]
if c.fill.is_empty:
raise InvalidArgument(
f"Empty fill strategy in {c!r} cannot "
f"complete row {original_row!r}"
)
row.append(draw(c.fill))
result.iloc[row_index] = row
break
else:
reject()
return result
return assign_rows()