Source code for pymodm.base.models

# Copyright 2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from bson.dbref import DBRef
from bson.son import SON

from pymodm import errors
from pymodm.base.options import MongoOptions
from pymodm.common import (
    register_document, get_document, validate_mapping,
    validate_list_tuple_or_none, validate_boolean_or_none,
    validate_boolean, snake_case)
from pymodm.compat import with_metaclass
from pymodm.context_managers import no_auto_dereference
from pymodm.errors import ValidationError, InvalidModel, OperationError
from pymodm.fields import ObjectIdField
from pymodm.manager import Manager


__all__ = ['MongoModel', 'EmbeddedMongoModel']


class MongoModelMetaclass(type):
    """Base metaclass for all Models."""

    def __new__(mcls, name, bases, attrs):
        model_parents = [
            base for base in bases if isinstance(base, MongoModelMetaclass)]
        # Only perform Model initialization steps if the class has inherited
        # from a Model base class (i.e. MongoModel/EmbeddedMongoModel).
        if not model_parents:
            return type.__new__(mcls, name, bases, attrs)

        new_attrs = {'__module__': attrs['__module__']}
        if '__classcell__' in attrs:
            new_attrs['__classcell__'] = attrs['__classcell__']
        new_class = type.__new__(mcls, name, bases, new_attrs)

        # User-defined or inherited metadata
        meta = attrs.get('Meta', getattr(new_class, 'Meta', None))
        # Allow the options class to be pluggable.
        # Pop it from attrs, since it's not useful in the final class.
        options_class = attrs.pop('_options_class', MongoOptions)
        options = options_class(meta)

        # Let the Options object take care of merging relevant options.
        new_class.add_to_class('_mongometa', options)

        # Add all attributes to class.
        for attr in attrs:
            new_class.add_to_class(attr, attrs[attr])

        def should_inherit_field(parent_class, field):
            # Never shadow fields defined on the new class.
            if field.attname in new_class._mongometa.fields_attname_dict:
                return False
            # Never inherit an implicit primary key.
            if field.primary_key and parent_class._mongometa.implicit_id:
                return False
            return True

        # Also add fields from parents into the metadata.
        for base in model_parents:
            if hasattr(base, '_mongometa'):
                parent_fields = base._mongometa.get_fields()
                for field in parent_fields:
                    if should_inherit_field(base, field):
                        new_class.add_to_class(field.attname, field)

        # Discover and store class hierarchy for later.
        class_name = new_class._mongometa.object_name
        new_class._subclasses = set([class_name])
        flattened_bases = new_class._get_bases(bases)
        for base in flattened_bases:
            if base._mongometa.final:
                raise InvalidModel(
                    'Cannot extend class %s, '
                    'because it has been declared final.'
                    % base._mongometa.object_name)
            base._subclasses.add(class_name)

        # Set the default collection name.
        if new_class._mongometa.collection_name is None:
            # If this class extends another custom MongoModel, use the same
            # collection.
            if flattened_bases:
                parent_cls = next(iter(flattened_bases))
                parent_collection_name = parent_cls._mongometa.collection_name
                new_class._mongometa.collection_name = parent_collection_name
            else:
                new_class._mongometa.collection_name = snake_case(name)

        # Create model-specific Exception types.
        for exc_type in (errors.DoesNotExist, errors.MultipleObjectsReturned):
            exc_name = exc_type.__name__
            parent_types = tuple(
                getattr(base, exc_name) for base in bases
                if hasattr(base, exc_name))
            model_exc = type(
                exc_name,
                parent_types or (exc_type,),
                {'__module__': attrs['__module__']})
            new_class.add_to_class(exc_name, model_exc)

        # Add class to the registry.
        register_document(new_class)

        return new_class

    @staticmethod
    def _get_bases(bases):
        found_bases = set()
        for base in bases:
            # Is it an ODM class?
            if hasattr(base, '_mongometa'):
                found_bases.add(base)
                found_bases.update(base._get_bases(base.__bases__))
        return found_bases

    def add_to_class(cls, name, value):
        """Add an attribute to this class.

        If the value defines a `contribute_to_class` method, it will be run
        with this class and the given name as its arguments.
        """
        # Check if value is an object and defines contribute_to_class.
        if hasattr(value, 'contribute_to_class'):
            value.contribute_to_class(cls, name)
        else:
            setattr(cls, name, value)


class TopLevelMongoModelMetaclass(MongoModelMetaclass):
    """Metaclass for all top-level (i.e. not embedded) Models."""
    def __new__(mcls, name, bases, attrs):
        # Allow the manager class to be pluggable.
        # Pop it from attrs now, so that the class doesn't become an attribute
        # of the new class.
        manager_class = attrs.pop('_manager_class', Manager)

        new_class = super(TopLevelMongoModelMetaclass, mcls).__new__(
            mcls, name, bases, attrs)

        # Conceptually the same as 'if new_class is MongoModelBase'.
        if not hasattr(new_class, '_mongometa'):
            return new_class

        # Check for a primary key field. If there isn't one, put one there.
        if new_class._mongometa.pk is None:
            id_field = ObjectIdField(primary_key=True)
            new_class._mongometa.implicit_id = True
            new_class.add_to_class('_id', id_field)

        # Add QuerySet Manager.
        manager = new_class._find_manager()
        if manager is None:
            manager = manager_class()
            new_class.add_to_class('objects', manager)
        new_class._mongometa.default_manager = manager

        return new_class

    def _find_manager(cls):
        first_manager = None
        for name in cls.__dict__:
            attr = getattr(cls, name)
            if isinstance(attr, Manager):
                if first_manager is None:
                    first_manager = attr
                elif first_manager.creation_order > attr.creation_order:
                    first_manager = attr
        return first_manager


class MongoModelBase(object):
    """Base class for MongoModel and EmbeddedMongoModel."""

    def __init__(self, *args, **kwargs):
        # Initialize dicts for saving python/mongo field values.
        self._data = _LazyDecoder()

        # Initialize dict for saving field default values.
        self._defaults = {}

        # Turn ordered arguments into keyword arguments.
        if args:
            len_args = len(args)
            # Get field names in the order they are defined on the Model.
            all_field_names = (
                field.attname for field in self._mongometa.get_fields()
                if not (field.primary_key and self._mongometa.implicit_id))
            len_all_fields = (len(self._mongometa.fields_dict) -
                              int(self._mongometa.implicit_id))
            if len_args > len_all_fields:
                raise ValueError(
                    'Got %d arguments for only %d fields.'
                    % (len_args, len_all_fields))
            for i in range(len_args):
                next_field_name = next(all_field_names)
                if next_field_name in kwargs:
                    raise ValueError(
                        'Field %s specified more than once '
                        'in constructor for %s.'
                        % (next_field_name, self.__class__.__name__))
                kwargs[next_field_name] = args[i]

        # Set values for specified fields
        field_names = set(
            field.attname for field in self._mongometa.get_fields())
        for field in kwargs:
            if 'pk' == field:
                setattr(self, self._mongometa.pk.attname, kwargs[field])
            elif field not in field_names:
                raise ValueError(
                    'Unrecognized field name %r' % field)
            else:
                setattr(self, field, kwargs[field])

    def _find_referenced_objects(self, value):
        """Find all referenced objects in the given object."""
        references = []
        if isinstance(value, TopLevelMongoModel):
            references.append(value)
        elif isinstance(value, list):
            for item in value:
                references.extend(self._find_referenced_objects(item))
        elif isinstance(value, MongoModelBase):
            # EmbeddedMongoModel
            for field_name in value:
                field_value = getattr(value, field_name)
                references.extend(value._find_referenced_objects(field_value))
        return references

    def _set_attributes(self, dict):
        """Set this object's attributes from a dict."""
        self._data.clear()
        self._defaults.clear()
        field_names = {
            field.mongo_name: field.attname
            for field in self._mongometa.get_fields()
        }
        ignore_unknown = self._mongometa.ignore_unknown_fields
        for field in dict:
            if '_cls' == field:
                continue
            elif '_id' == field and not self._mongometa.implicit_id:
                self._data.set_mongo_value(
                    self._mongometa.pk.attname, dict[field])
            elif field in field_names:
                self._data.set_mongo_value(
                    field_names[field], dict[field])
            elif not ignore_unknown:
                raise ValueError(
                    'Unrecognized field name %r' % field)

    @classmethod
    def from_document(cls, document):
        """Construct an instance of this class from the given document.

        :parameters:
          - `document`: A Python dictionary describing a MongoDB document.
            Keys within the document must be named according to each model
            field's `mongo_name` attribute, rather than the field's Python
            name.

        """
        dct = validate_mapping('document', document)
        doc_cls = cls
        cls_name = dct.get('_cls')
        if cls_name is not None:
            doc_cls = get_document(cls_name)
            if not issubclass(doc_cls, cls):
                raise TypeError('A document\'s _cls field must be '
                                'a subclass of the %s, but %s is not.'
                                % (doc_cls, cls))

        inst = doc_cls()
        inst._set_attributes(dct)
        return inst

    def to_son(self):
        """Get this Model back as a :class:`~bson.son.SON` object.

        :returns: SON representing this object as a MongoDB document.

        """
        son = SON()
        with no_auto_dereference(self):
            for field in self._mongometa.get_fields():
                if field.is_undefined(self):
                    continue
                value = self._data.get_python_value(
                    field.attname, field.to_python
                )
                if field.is_blank(value):
                    son[field.mongo_name] = value
                else:
                    son[field.mongo_name] = field.to_mongo(value)
        # Add metadata about our type, so that we instantiate the right class
        # when retrieving from MongoDB.
        if not self._mongometa.final:
            son['_cls'] = self._mongometa.object_name
        return son

    def clean(self):
        """Run custom validation rules run when
        :meth:`~pymodm.MongoModel.full_clean` is called.

        This is an abstract method that can be overridden to validate the
        :class:`~pymodm.MongoModel` instance as a whole. Custom field validation
        is better done by passing a validator to the `validators` parameter in a
        field's constructor.

        example::

            class Vacation(MongoModel):
                destination = fields.CharField(choices=('HAWAII', 'DETROIT'))
                travel_method = fields.CharField(
                    choices=('PLANE', 'CAR', 'BOAT'))

                def clean(self):
                    # Custom validation that requires looking at several fields.
                    if (self.destination == 'HAWAII' and
                            self.travel_method == 'CAR'):
                        raise ValidationError('Cannot travel to Hawaii by car.')

        """
        pass

    def clean_fields(self, exclude=None):
        """Validate the values of all fields.

        This method will raise a :exc:`~pymodm.errors.ValidationError` that
        describes all issues with each field, if any field fails to pass
        validation.

        :parameters:
          - `exclude`: A list of fields to exclude from validation.

        """
        exclude = validate_list_tuple_or_none('exclude', exclude)
        exclude = set(exclude) if exclude else set()
        error_dict = {}
        for field in self._mongometa.get_fields():
            if field.attname in exclude:
                continue
            try:
                field_value = field.value_from_object(self)
                field_empty = field.is_undefined(self)
                if field_empty and field.required:
                    error_dict[field.attname] = [ValidationError(
                        'field is required.')]
                elif not field_empty:
                    field.validate(field_value)
            except Exception as exc:
                error_dict[field.attname] = [ValidationError(exc)]
        if error_dict:
            raise ValidationError(error_dict)

    def full_clean(self, exclude=None):
        """Validate this :class:`~pymodm.MongoModel`.

        This method calls :meth:`~pymodm.MongoModel.clean_fields` to validate
        the values of all fields then :meth:`~pymodm.MongoModel.clean` to
        apply any custom validation rules to the model as a whole.

        :parameters:
          - `exclude`: A list of fields to exclude from validation.

        """
        with no_auto_dereference(self):
            self.clean_fields(exclude=exclude)
        self.clean()

    def __iter__(self):
        return iter(self._data)

    def __str__(self):
        return '<%s object>' % self.__class__.__name__

    def __repr__(self):
        attrs = ('%s=%r' % (fname, getattr(self, fname)) for fname in self)
        return '%s(%s)' % (self.__class__.__name__, ', '.join(attrs))

    def __eq__(self, other):
        if isinstance(other, MongoModelBase):
            return self._data == other._data
        return NotImplemented


class TopLevelMongoModel(MongoModelBase):
    @classmethod
    def register_delete_rule(cls, related_model, related_field, rule):
        """Specify what to do when an instance of this class is deleted.

        :parameters:
          - `related_model`: The class that references this class.
          - `related_field`: The name of the field in ``related_model`` that
            references this class.
          - `rule`: The delete rule. See
            :class:`~pymodm.fields.ReferenceField` for details.

        """
        cls._mongometa.delete_rules[(related_model, related_field)] = rule

    @property
    def pk(self):
        """An alias for the primary key (called `_id` in MongoDB)."""
        if self._mongometa.pk is not None:
            return getattr(self, self._mongometa.pk.attname)

    @pk.setter
    def pk(self, value):
        if self._mongometa.pk is None:
            raise ValueError('No primary key set for %s'
                             % self._mongometa.object_name)
        setattr(self, self._mongometa.pk.attname, value)

    @property
    def _qs(self):
        if not hasattr(self, '__queryset'):
            self.__queryset = None
        if (self.__queryset is None and
                not self._mongometa.pk.is_undefined(self)):
            self.__queryset = self.__class__._mongometa.default_manager.raw(
                {'_id': self._mongometa.pk.to_mongo(self.pk)})
        return self.__queryset

    def save(self, cascade=None, full_clean=True, force_insert=False):
        """Save this document into MongoDB.

        If there is no value for the primary key on this Model instance, the
        instance will be inserted into MongoDB. Otherwise, the entire document
        will be replaced with this version (upserting if necessary).

        :parameters:
          - `cascade`: If ``True``, all dereferenced MongoModels contained in
            this Model instance will also be saved.
          - `full_clean`: If ``True``, the
            :meth:`~pymodm.MongoModel.full_clean` method
            will be called before persisting this object.
          - `force_insert`: If ``True``, always do an insert instead of a
            replace. In this case, `save` will raise
            :class:`~pymongo.errors.DuplicateKeyError` if a document already
            exists with the same primary key.

        :returns: This object, with the `pk` property filled in if it wasn't
                  already.

        """
        cascade = validate_boolean_or_none('cascade', cascade)
        full_clean = validate_boolean('full_clean', full_clean)
        force_insert = validate_boolean('force_insert', force_insert)
        if full_clean:
            self.full_clean()
        if cascade or (self._mongometa.cascade and cascade is not False):
            for field_name in self:
                for referenced_object in self._find_referenced_objects(
                        getattr(self, field_name)):
                    referenced_object.save()
        if force_insert or self._mongometa.pk.is_undefined(self):
            result = self._mongometa.collection.insert_one(self.to_son())
            self.pk = result.inserted_id
        else:
            result = self._mongometa.collection.replace_one(
                {'_id': self._mongometa.pk.to_mongo(self.pk)},
                self.to_son(), upsert=True)
        return self

    def delete(self):
        """Delete this object from MongoDB."""
        self._qs.delete()

    def is_valid(self):
        """Return ``True`` if the data in this Model is valid.

        This method runs the
        :meth:`~pymodm.MongoModel.full_clean`
        method and returns ``True`` if no ValidationError was raised.

        """
        try:
            self.full_clean()
        except ValidationError:
            return False
        return True

    def refresh_from_db(self, fields=None):
        """Reload this object from the database, overwriting local field values.

        :parameters:
          - `fields`: An iterable of fields to reload. Defaults to all fields.

        .. warning:: This method will reload the object from the database,
           possibly with only a subset of fields. Calling
           :meth:`~pymodm.MongoModel.save` after this may revert or unset
           fields in the database.

        """
        fields = validate_list_tuple_or_none('fields', fields)
        if self._qs is None:
            raise OperationError('Cannot refresh from db before saving.')
        qs = self._qs.values()
        if fields:
            qs = qs.only(*fields)
        db_inst = qs.first()

        self._set_attributes(db_inst)
        return self

    def __eq__(self, other):
        if self.pk is not None:
            if isinstance(other, self.__class__) and other.pk is not None:
                return self.pk == other.pk
            elif isinstance(other, DBRef):
                return self.pk == other.id
        return self is other


[docs]class MongoModel( with_metaclass(TopLevelMongoModelMetaclass, TopLevelMongoModel)): """Base class for all top-level models. A MongoModel definition typically includes a number of field instances and possibly a ``Meta`` class attribute that provides metadata or settings specific to the model. MongoModels can be instantiated either with positional or keyword arguments. Positional arguments are bound to the fields in the order the fields are defined on the model. Keyword argument names are the same as the names of the fields:: from pymongo.read_preferences import ReadPreference class User(MongoModel): email = fields.EmailField(primary_key=True) name = fields.CharField() class Meta: # Read from secondaries. read_preference = ReadPreference.SECONDARY # Instantiate User using positional arguments: jane = User('jane@janesemailaddress.net', 'Jane') # Keyword arguments: roy = User(name='Roy', email='roy@roysemailaddress.net') Note that :func:`~pymodm.connection.connect` has to be called (defining the respective connection alias, if any) before any :class:`~pymodm.MongoModel` can be used with that alias. If ``indexes`` is defined on ``Meta``, then this has to be before the MongoModel class is evaluated. .. _metadata-attributes: The following metadata attributes are available: - `connection_alias`: The alias of the connection to use for the model. - `collection_name`: The name of the collection to use. By default, this is the same name as the model, converted to snake case. - `codec_options`: An instance of :class:`~bson.codec_options.CodecOptions` to use for reading and writing documents of this model type. - `final`: Whether to restrict inheritance on this model. If ``True``, the ``_cls`` field will not be stored in the document. ``False`` by default. - `cascade`: If ``True``, save all :class:`~pymodm.MongoModel` instances this object references when :meth:`~pymodm.MongoModel.save` is called on this object. - `read_preference`: The :class:`~pymongo.read_preferences.ReadPreference` to use when reading documents. - `read_concern`: The :class:`~pymongo.read_concern.ReadConcern` to use when reading documents. - `write_concern`: The :class:`~pymongo.write_concern.WriteConcern` to use for write operations. - `indexes`: This is a list of :class:`~pymongo.operations.IndexModel` instances that describe the indexes that should be created for this model. Indexes are created when the class definition is evaluated. - `ignore_unknown_fields`: If ``True``, fields that aren't defined in the model will be ignored when parsing documents from MongoDB, such as in :meth:`~pymodm.MongoModel.from_document`. By default, unknown fields will cause a ``ValueError`` to be raised. Note that with this option enabled, calling :meth:`~pymodm.MongoModel.save` will erase these fields for that model instance. .. note:: Creating an instance of MongoModel does not create a document in the database. """ pass
[docs]class EmbeddedMongoModel(with_metaclass(MongoModelMetaclass, MongoModelBase)): """Base class for models that represent embedded documents.""" pass
class _LazyDecoder(object): def __init__(self): self._mongo_data = {} self._python_data = {} self._members = set() def __contains__(self, item): return item in self._members def __iter__(self): return iter(self._members) def __eq__(self, other): if self._members != other._members: return False for key in self: if self._get_raw_value(key) != other._get_raw_value(key): return False return True def remove(self, key): self._mongo_data.pop(key, None) self._python_data.pop(key, None) self._members.discard(key) def clear(self): self._mongo_data.clear() self._python_data.clear() self._members.clear() def _get_raw_value(self, key): try: return self._python_data[key] except KeyError: return self._mongo_data[key] def get_mongo_value(self, key, to_mongo): try: return self._mongo_data[key] except KeyError: pvalue = self._python_data[key] if pvalue is None: return pvalue return to_mongo(pvalue) def set_mongo_value(self, key, value): self._python_data.pop(key, None) self._mongo_data[key] = value self._members.add(key) def get_python_value(self, key, to_python): try: return self._python_data[key] except KeyError: mvalue = self._mongo_data.pop(key) if mvalue is None: self._python_data[key] = mvalue return mvalue pvalue = to_python(mvalue) self._python_data[key] = pvalue return pvalue def set_python_value(self, key, value): self._mongo_data.pop(key, None) self._python_data[key] = value self._members.add(key)