From 6428be310e906d8437a767d7aa9f09b9cb3dad4d Mon Sep 17 00:00:00 2001
From: Valentin Rigal <rigal@teklia.com>
Date: Thu, 10 Dec 2020 10:56:57 +0100
Subject: [PATCH] Generic ACL mixin

---
 arkindex/project/mixins.py | 97 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 97 insertions(+)

diff --git a/arkindex/project/mixins.py b/arkindex/project/mixins.py
index 9cdd5994f4..3c0ffd6c6f 100644
--- a/arkindex/project/mixins.py
+++ b/arkindex/project/mixins.py
@@ -1,5 +1,6 @@
 from django.conf import settings
 from django.core.exceptions import PermissionDenied
+from django.db.models import Q, functions
 from django.shortcuts import get_object_or_404
 from django.views.decorators.cache import cache_page
 from rest_framework.exceptions import APIException, ValidationError
@@ -12,6 +13,102 @@ from arkindex.project.openapi import AutoSchema, SearchAutoSchema
 from arkindex.project.pagination import CustomCursorPagination
 
 
+class ACLMixin(object):
+    """
+    Access control mixin using the generic Right table.
+    """
+    _user = None
+
+    def __init__(self, user=None):
+        self._user = user
+
+    @property
+    def user(self):
+        return self._user or self.request.user
+
+    def rights_filter(self, model, level, public=False):
+        """
+        Return a model queryset matching a given access level for this user
+        """
+        if self.user.is_admin or self.user.is_internal:
+            return model.objects.all()
+        filters = Q(max_level__gte=level)
+        if (public):
+            # Allow access to public instances if the public parameter is set
+            filters = filters | Q(public=True)
+        return model.objects \
+            .filter(
+                Q(memberships__user=self.user)
+                | Q(memberships__group__memberships__user=self.user)
+            ) \
+            .annotate(max_level=functions.Least(
+                'memberships__level',
+                'memberships__group__memberships__level'
+            )) \
+            .filter(filters) \
+            .distinct()
+
+    def has_access(self, instance, level):
+        if self.user.is_admin or self.user.is_internal:
+            return True
+        return instance.memberships.filter(
+            Q(
+                # Right direcly owned by this user
+                Q(user=self.user)
+                & Q(level__gte=level)
+            )
+            | Q(
+                # Right owned by the group and by the user
+                Q(group__memberships__user=self.user)
+                & Q(level__gte=level)
+                & Q(group__memberships__level__gte=level)
+            )
+        ).exists()
+
+    def has_read_access(self, instance):
+        if not hasattr(instance, 'READ_LEVEL'):
+            return False
+        # Always allow a read access to a public instance
+        if hasattr(instance, 'public') and instance.public:
+            return True
+        return self.has_access(instance, instance.READ_LEVEL)
+
+    def has_write_access(self, instance):
+        if not hasattr(instance, 'WRITE_LEVEL'):
+            return False
+        return self.has_access(instance, instance.WRITE_LEVEL)
+
+    def has_admin_access(self, instance):
+        if not hasattr(instance, 'ADMIN_LEVEL'):
+            return False
+        return self.has_access(instance, instance.ADMIN_LEVEL)
+
+    def has_execute_access(self, instance):
+        if not hasattr(instance, 'EXECUTE_LEVEL'):
+            return False
+        return self.has_access(instance, instance.EXECUTE_LEVEL)
+
+
+class NewCorpusACLMixin(ACLMixin):
+    def readable_corpora(self):
+        return self.rights_filter(Corpus, Role.Guest.value, public=True)
+
+    def writable_corpora(self):
+        return self.rights_filter(Corpus, Role.Contributor.value)
+
+    def administrable_corpora(self):
+        return self.rights_filter(Corpus, Role.Admin.value)
+
+    def has_read_access(self, corpus):
+        return corpus.public or self.has_access(corpus, Role.Guest.value)
+
+    def has_write_access(self, corpus):
+        return self.has_access(corpus, Role.Contributor.value)
+
+    def has_admin_access(self, corpus):
+        return self.has_access(corpus, Role.Admin.value)
+
+
 class CorpusACLMixin(object):
 
     def get_corpus(self, corpus_id, right=Right.Read):
-- 
GitLab