cadquery-freecad-module/Libs/tinydb/queries.py

377 lines
10 KiB
Python

"""
Contains the querying interface.
Starting with :class:`~tinydb.queries.Query` you can construct complex
queries:
>>> ((where('f1') == 5) & (where('f2') != 2)) | where('s').matches(r'^\w+$')
(('f1' == 5) and ('f2' != 2)) or ('s' ~= ^\w+$ )
Queries are executed by using the ``__call__``:
>>> q = where('val') == 5
>>> q({'val': 5})
True
>>> q({'val': 1})
False
"""
import re
import sys
from .utils import catch_warning, freeze
__all__ = ('Query', 'where')
def is_sequence(obj):
return hasattr(obj, '__iter__')
class QueryImpl(object):
"""
A query implementation.
This query implementation wraps a test function which is run when the
query is evaluated by calling the object.
Queries can be combined with logical and/or and modified with logical not.
"""
def __init__(self, test, hashval):
self.test = test
self.hashval = hashval
def __call__(self, value):
return self.test(value)
def __hash__(self):
return hash(self.hashval)
def __repr__(self):
return 'QueryImpl{}'.format(self.hashval)
def __eq__(self, other):
return self.hashval == other.hashval
# --- Query modifiers -----------------------------------------------------
def __and__(self, other):
# We use a frozenset for the hash as the AND operation is commutative
# (a & b == b & a)
return QueryImpl(lambda value: self(value) and other(value),
('and', frozenset([self.hashval, other.hashval])))
def __or__(self, other):
# We use a frozenset for the hash as the OR operation is commutative
# (a | b == b | a)
return QueryImpl(lambda value: self(value) or other(value),
('or', frozenset([self.hashval, other.hashval])))
def __invert__(self):
return QueryImpl(lambda value: not self(value),
('not', self.hashval))
class Query(object):
"""
TinyDB Queries.
Allows to build queries for TinyDB databases. There are two main ways of
using queries:
1) ORM-like usage:
>>> User = Query()
>>> db.search(User.name == 'John Doe')
>>> db.search(User['logged-in'] == True)
2) Classical usage:
>>> db.search(where('value') == True)
Note that ``where(...)`` is a shorthand for ``Query(...)`` allowing for
a more fluent syntax.
Besides the methods documented here you can combine queries using the
binary AND and OR operators:
>>> db.search(where('field1').exists() & where('field2') == 5) # Binary AND
>>> db.search(where('field1').exists() | where('field2') == 5) # Binary OR
Queries are executed by calling the resulting object. They expect to get
the document to test as the first argument and return ``True`` or
``False`` depending on whether the documents matches the query or not.
"""
def __init__(self):
self._path = []
def __getattr__(self, item):
query = Query()
query._path = self._path + [item]
return query
__getitem__ = __getattr__
def _generate_test(self, test, hashval):
"""
Generate a query based on a test function.
:param test: The test the query executes.
:param hashval: The hash of the query.
:return: A :class:`~tinydb.queries.QueryImpl` object
"""
if not self._path:
raise ValueError('Query has no path')
def impl(value):
try:
# Resolve the path
for part in self._path:
value = value[part]
except (KeyError, TypeError):
return False
else:
return test(value)
return QueryImpl(impl, hashval)
def __eq__(self, rhs):
"""
Test a dict value for equality.
>>> Query().f1 == 42
:param rhs: The value to compare against
"""
if sys.version_info <= (3, 0): # pragma: no cover
# Special UTF-8 handling on Python 2
def test(value):
with catch_warning(UnicodeWarning):
try:
return value == rhs
except UnicodeWarning:
# Dealing with a case, where 'value' or 'rhs'
# is unicode and the other is a byte string.
if isinstance(value, str):
return value.decode('utf-8') == rhs
elif isinstance(rhs, str):
return value == rhs.decode('utf-8')
else: # pragma: no cover
def test(value):
return value == rhs
return self._generate_test(
lambda value: test(value),
('==', tuple(self._path), freeze(rhs))
)
def __ne__(self, rhs):
"""
Test a dict value for inequality.
>>> Query().f1 != 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value != rhs,
('!=', tuple(self._path), freeze(rhs))
)
def __lt__(self, rhs):
"""
Test a dict value for being lower than another value.
>>> Query().f1 < 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value < rhs,
('<', tuple(self._path), rhs)
)
def __le__(self, rhs):
"""
Test a dict value for being lower than or equal to another value.
>>> where('f1') <= 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value <= rhs,
('<=', tuple(self._path), rhs)
)
def __gt__(self, rhs):
"""
Test a dict value for being greater than another value.
>>> Query().f1 > 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value > rhs,
('>', tuple(self._path), rhs)
)
def __ge__(self, rhs):
"""
Test a dict value for being greater than or equal to another value.
>>> Query().f1 >= 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value >= rhs,
('>=', tuple(self._path), rhs)
)
def exists(self):
"""
Test for a dict where a provided key exists.
>>> Query().f1.exists()
"""
return self._generate_test(
lambda _: True,
('exists', tuple(self._path))
)
def matches(self, regex):
"""
Run a regex test against a dict value (whole string has to match).
>>> Query().f1.matches(r'^\w+$')
:param regex: The regular expression to use for matching
"""
return self._generate_test(
lambda value: re.match(regex, value),
('matches', tuple(self._path), regex)
)
def search(self, regex):
"""
Run a regex test against a dict value (only substring string has to
match).
>>> Query().f1.search(r'^\w+$')
:param regex: The regular expression to use for matching
"""
return self._generate_test(
lambda value: re.search(regex, value),
('search', tuple(self._path), regex)
)
def test(self, func, *args):
"""
Run a user-defined test function against a dict value.
>>> def test_func(val):
... return val == 42
...
>>> Query().f1.test(test_func)
:param func: The function to call, passing the dict as the first
argument
:param args: Additional arguments to pass to the test function
"""
return self._generate_test(
lambda value: func(value, *args),
('test', tuple(self._path), func, args)
)
def any(self, cond):
"""
Check if a condition is met by any document in a list,
where a condition can also be a sequence (e.g. list).
>>> Query().f1.any(Query().f2 == 1)
Matches::
{'f1': [{'f2': 1}, {'f2': 0}]}
>>> Query().f1.any([1, 2, 3])
Matches::
{'f1': [1, 2]}
{'f1': [3, 4, 5]}
:param cond: Either a query that at least one document has to match or
a list of which at least one document has to be contained
in the tested document.
"""
if callable(cond):
def _cmp(value):
return is_sequence(value) and any(cond(e) for e in value)
else:
def _cmp(value):
return is_sequence(value) and any(e in cond for e in value)
return self._generate_test(
lambda value: _cmp(value),
('any', tuple(self._path), freeze(cond))
)
def all(self, cond):
"""
Check if a condition is met by all documents in a list,
where a condition can also be a sequence (e.g. list).
>>> Query().f1.all(Query().f2 == 1)
Matches::
{'f1': [{'f2': 1}, {'f2': 1}]}
>>> Query().f1.all([1, 2, 3])
Matches::
{'f1': [1, 2, 3, 4, 5]}
:param cond: Either a query that all documents have to match or a list
which has to be contained in the tested document.
"""
if callable(cond):
def _cmp(value):
return is_sequence(value) and all(cond(e) for e in value)
else:
def _cmp(value):
return is_sequence(value) and all(e in value for e in cond)
return self._generate_test(
lambda value: _cmp(value),
('all', tuple(self._path), freeze(cond))
)
def one_of(self, items):
"""
Check if the value is contained in a list or generator.
>>> Query().f1.one_of(['value 1', 'value 2'])
:param items: The list of items to check with
"""
return self._generate_test(
lambda value: value in items,
('one_of', tuple(self._path), freeze(items))
)
def where(key):
return Query()[key]