Skip to content
Snippets Groups Projects
Commit 278ce198 authored by Valentin Rigal's avatar Valentin Rigal
Browse files

Split filter_rights function

parent 92f4d352
No related branches found
No related tags found
No related merge requests found
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.db.models import IntegerField, Q, Value, functions
from django.db.models.query_utils import DeferredAttribute
from django.db.models import Q
from django.shortcuts import get_object_or_404
from django.views.decorators.cache import cache_page
from rest_framework.exceptions import APIException, ValidationError
......@@ -14,89 +13,29 @@ from arkindex.project.elastic import ESQuerySet
from arkindex.project.openapi import AutoSchema, SearchAutoSchema
from arkindex.project.pagination import CustomCursorPagination
from arkindex.users.models import Role
from arkindex.users.utils import check_level_param, filter_rights
class ACLMixin(object):
"""
Access control mixin using the generic Right table.
"""
_user = None
mixin_order_by_fields = ()
def __init__(self, user=None, order_by_fields=()):
def __init__(self, user=None):
self._user = user
self.mixin_order_by_fields = order_by_fields
@property
def user(self):
return self._user or self.request.user
def _check_level(self, level):
assert type(level) is int, 'An integer level is required to compare access rights.'
assert level >= 1, 'Level integer should be greater than or equal to 1.'
assert level <= 100, 'level integer should be lower than or equal to 100'
def _has_public_field(self, model):
return type(getattr(model, 'public', None)) is DeferredAttribute
def get_public_instances(self, model, default_level):
return model.objects \
.filter(public=True) \
.annotate(max_level=Value(default_level, IntegerField()))
def rights_filter(self, model, level, public=False):
"""
Return a model queryset matching a given access level for this user.
"""
self._check_level(level)
include_public = level <= Role.Guest.value and self._has_public_field(model)
# Handle special authentications
if self.user.is_anonymous:
# Anonymous users have Guest access on public instances only
if not include_public:
return model.objects.none()
return self.get_public_instances(model, Role.Guest.value) \
.order_by(*self.mixin_order_by_fields, 'id')
elif self.user.is_admin or self.user.is_internal:
# Superusers have an Admin access to all corpora
return model.objects.all() \
.annotate(max_level=Value(Role.Admin.value, IntegerField())) \
.order_by(*self.mixin_order_by_fields, 'id')
# Filter users rights and annotate the resulting level for those rights
queryset = model.objects \
.filter(
# Filter instances with rights concerning this user. This may create duplicates
Q(memberships__user=self.user)
| Q(memberships__group__memberships__user=self.user)
) \
.annotate(
# Keep only the lowest level for each right via group
max_level=functions.Least(
'memberships__level',
# In case of direct right, the group level will be skipped (Null value)
'memberships__group__memberships__level'
)
)
# Order by decreasing max_level to make sure we keep the max among all rights
queryset = queryset.filter(max_level__gte=level) \
.order_by(*self.mixin_order_by_fields, 'id', '-max_level') \
.distinct(*self.mixin_order_by_fields, 'id')
# Use a join to add public instances as this is the more elegant solution
if include_public:
queryset = queryset.union(self.get_public_instances(model, Role.Guest.value))
# Return distinct corpus with the max right level among matching rights
return queryset.order_by(*self.mixin_order_by_fields, 'id')
def has_access(self, instance, level):
self._check_level(level)
check_level_param(level)
# Handle special authentications
if level <= Role.Guest.value and getattr(instance, 'public', False):
return True
if self.user.is_admin or self.user.is_internal:
return True
return instance.memberships.filter(
Q(
# Right direcly owned by this user
......@@ -116,11 +55,15 @@ class RepositoryACLMixin(ACLMixin):
@property
def readable_repositories(self):
return self.rights_filter(Repository, Role.Guest.value)
return Repository.objects.filter(
id__in=filter_rights(self.user, Repository, Role.Guest.value).values('id')
)
@property
def executable_repositories(self):
return self.rights_filter(Repository, Role.Contributor.value)
return Repository.objects.filter(
id__in=filter_rights(self.user, Repository, Role.Contributor.value).values('id')
)
def has_read_access(self, repo):
return self.has_access(repo, Role.Guest.value)
......@@ -136,11 +79,15 @@ class NewCorpusACLMixin(ACLMixin):
@property
def readable_corpora(self):
return self.rights_filter(Corpus, Role.Guest.value)
return Corpus.objects.filter(
id__in=filter_rights(self.user, Corpus, Role.Guest.value).values('id')
)
@property
def writable_corpora(self):
return self.rights_filter(Corpus, Role.Contributor.value)
return Corpus.objects.filter(
id__in=filter_rights(self.user, Corpus, Role.Contributor.value).values('id')
)
@property
def administrable_corpora(self):
......
from django.db.models import IntegerField, Q, Value, functions
from django.db.models.query_utils import DeferredAttribute
from arkindex.users.models import Role
PUBLIC_LEVEL = Role.Guest.value
def has_public_field(model):
return type(getattr(model, 'public', None)) is DeferredAttribute
def get_public_instances(model):
return model.objects \
.filter(public=True) \
.annotate(max_level=Value(PUBLIC_LEVEL, IntegerField()))
def check_level_param(level):
assert type(level) is int, 'An integer level is required to compare access rights.'
assert level >= 1, 'Level integer should be greater than or equal to 1.'
assert level <= 100, 'level integer should be lower than or equal to 100'
def filter_rights(user, model, level):
"""
Return a generic queryset of objects with access rights for this user.
Level filtering parameter should be an integer between 1 and 100.
"""
check_level_param(level)
public = level <= Role.Guest.value and has_public_field(model)
# Handle special authentications
if user.is_anonymous:
# Anonymous users have Guest access on public instances only
if not public:
return model.objects.none()
return get_public_instances(model)
elif user.is_admin or user.is_internal:
# Superusers have an Admin access to all corpora
return model.objects.all() \
.annotate(max_level=Value(Role.Admin.value, IntegerField()))
# Filter users rights and annotate the resulting level for those rights
queryset = model.objects \
.filter(
# Filter instances with rights concerning this user
Q(memberships__user=user)
| Q(memberships__group__memberships__user=user)
) \
.annotate(
# Keep only the lowest level for each right via group
max_level=functions.Least(
'memberships__level',
# In case of direct right, the group level will be skipped (Null value)
'memberships__group__memberships__level'
)
) \
.filter(max_level__gte=level)
# Use a join to add public instances as this is the more elegant solution
if public:
queryset = queryset.union(get_public_instances(model))
return queryset
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment