Source code for djangocms_stories.managers

from __future__ import annotations

from collections import Counter

from django.contrib.contenttypes.models import ContentType
from django.contrib.sites.models import Site
from django.db import models
from django.utils.timezone import now

from cms.models.managers import WithUserMixin


class TaggedFilterItem:
    def tagged(self, other_model=None, queryset=None):
        """
        Restituisce una queryset di elementi del model taggati,
        o con gli stessi tag di un model o un queryset
        """
        tags = self._taglist(other_model, queryset)
        return self.get_queryset().filter(post__tags__in=tags).distinct()

    def _taglist(self, other_model=None, queryset=None):
        """
        Restituisce una lista di id di tag comuni al model corrente e al model
        o queryset passati come argomento
        """
        from taggit.models import TaggedItem

        filters = None
        if queryset is not None:
            filters = set()
            for item in queryset.all():
                filters.update(item.tags.all())
            filters = {tag.id for tag in filters}
        elif other_model is not None:
            filters = set(
                TaggedItem.objects.filter(content_type__model=other_model.__name__.lower()).values_list(
                    "tag_id", flat=True
                )
            )
        tags = set(
            TaggedItem.objects.filter(content_type__model=self.model.__name__.lower()).values_list("tag_id", flat=True)
        )
        if filters is not None:
            tags = tags.intersection(filters)
        return list(tags)

    def tag_list(self, other_model=None, queryset=None):
        """
        Restituisce un queryset di tag comuni al model corrente e
        al model o queryset passati come argomento
        """
        from taggit.models import Tag

        return Tag.objects.filter(id__in=self._taglist(other_model, queryset))

    def tag_list_slug(self, other_model=None, queryset=None):
        queryset = self.tag_list(other_model, queryset)
        return queryset.values("slug")

    def tag_cloud(self, other_model=None, queryset=None, published: bool = True, site: Site | None = None):
        from taggit.models import TaggedItem

        if site:
            queryset = queryset.on_site(site)
        tag_ids = self._taglist(other_model, queryset)
        kwargs = {}
        if published:
            kwargs = {
                "object_id__in": self.model.objects.all(),
                "content_type": ContentType.objects.get_for_model(self.model),
            }
        kwargs["tag_id__in"] = tag_ids
        counted_tags = dict(
            TaggedItem.objects.filter(**kwargs)
            .values("tag")
            .annotate(count=models.Count("tag"))
            .values_list("tag", "count")
        )
        tags = TaggedItem.tag_model().objects.filter(pk__in=counted_tags.keys())
        for tag in tags:
            tag.count = counted_tags[tag.pk]
        return sorted(tags, key=lambda x: -x.count)


class SiteQuerySet(models.QuerySet):
    def on_site(self, site: Site) -> SiteQuerySet:
        return self.filter(models.Q(post__sites__isnull=True) | models.Q(post__sites=site.pk))


class AdminSiteQuerySet(SiteQuerySet):
    def current_content(self, **kwargs):
        """If a versioning package is installed, this returns the currently valid content
        that matches the filter given in kwargs. Used to find content to be copied, e.g..
        Without versioning every page is current."""
        return self.filter(**kwargs)

    def latest_content(self, **kwargs):
        """If a versioning package is installed, returns the latest version that matches the
        filter given in kwargs including discared or unpublished page content. Without versioning
        every page content is the latest."""
        return self.filter(**kwargs)


class SiteManager(WithUserMixin, models.Manager):
    _queryset_class = SiteQuerySet


[docs] class AdminManager(models.Manager): _queryset_class = AdminSiteQuerySet
[docs] def current_content(self, **kwargs): """Syntactic sugar: admin_manager.current_content()""" return self.get_queryset().current_content(**kwargs)
[docs] def latest_content(self, **kwargs): """Syntactic sugar: admin_manager.latest_content()""" return self.get_queryset().latest_content(**kwargs)
class GenericDateTaggedManager(TaggedFilterItem, models.Manager): use_for_related_fields = True start_date_field = "date_featured" fallback_date_field = "date_modified" queryset_class = SiteQuerySet def get_queryset(self, *args, **kwargs): return self.queryset_class(model=self.model, using=self._db, hints=self._hints) def on_site(self, site=None): return self.get_queryset().on_site(site) def get_months(self, queryset=None, site: Site | None = None): """ Get months with aggregate count (how many posts is in the month). Results are ordered by date. """ if queryset is None: queryset = self.get_queryset() if site: queryset = queryset.on_site(site) dates_qs = queryset.values_list(self.start_date_field, self.fallback_date_field) dates = [] for blog_dates in dates_qs: if blog_dates[0]: current_date = blog_dates[0] else: current_date = blog_dates[1] dates.append( ( current_date.year, current_date.month, ) ) date_counter = Counter(dates) dates = set(dates) dates = sorted(dates, reverse=True) return [ {"date": now().replace(year=year, month=month, day=1), "count": date_counter[year, month]} for year, month in dates ]