11from crum import get_current_user
2- from django .db .models import Exists , OuterRef , Q
2+ from django .db .models import Q , Subquery
33
44from dojo .authorization .authorization import get_roles_for_permission , user_has_global_permission
55from dojo .models import (
1010 Product_Type_Group ,
1111 Product_Type_Member ,
1212)
13+ from dojo .request_cache import cache_for_request
1314
1415
15- def get_authorized_endpoints (permission , queryset = None , user = None ):
16-
16+ # Cached: all parameters are hashable, no dynamic queryset filtering
17+ @cache_for_request
18+ def get_authorized_endpoints (permission , user = None ):
19+ """Cached - returns all endpoints the user is authorized to see."""
1720 if user is None :
1821 user = get_current_user ()
1922
2023 if user is None :
2124 return Endpoint .objects .none ()
2225
23- endpoints = Endpoint .objects .all ().order_by ("id" ) if queryset is None else queryset
26+ endpoints = Endpoint .objects .all ().order_by ("id" )
2427
2528 if user .is_superuser :
2629 return endpoints
@@ -29,41 +32,86 @@ def get_authorized_endpoints(permission, queryset=None, user=None):
2932 return endpoints
3033
3134 roles = get_roles_for_permission (permission )
35+
36+ # Get authorized product/product_type IDs via subqueries
3237 authorized_product_type_roles = Product_Type_Member .objects .filter (
33- product_type = OuterRef ( "product__prod_type_id" ) ,
34- user = user ,
35- role__in = roles )
38+ user = user , role__in = roles ,
39+ ). values ( "product_type_id" )
40+
3641 authorized_product_roles = Product_Member .objects .filter (
37- product = OuterRef ( "product_id" ) ,
38- user = user ,
39- role__in = roles )
42+ user = user , role__in = roles ,
43+ ). values ( "product_id" )
44+
4045 authorized_product_type_groups = Product_Type_Group .objects .filter (
41- product_type = OuterRef ( "product__prod_type_id" ) ,
42- group__users = user ,
43- role__in = roles )
46+ group__users = user , role__in = roles ,
47+ ). values ( "product_type_id" )
48+
4449 authorized_product_groups = Product_Group .objects .filter (
45- product = OuterRef ("product_id" ),
46- group__users = user ,
47- role__in = roles )
48- endpoints = endpoints .annotate (
49- product__prod_type__member = Exists (authorized_product_type_roles ),
50- product__member = Exists (authorized_product_roles ),
51- product__prod_type__authorized_group = Exists (authorized_product_type_groups ),
52- product__authorized_group = Exists (authorized_product_groups ))
50+ group__users = user , role__in = roles ,
51+ ).values ("product_id" )
52+
53+ # Filter using IN with Subquery - no annotations needed
5354 return endpoints .filter (
54- Q (product__prod_type__member = True ) | Q (product__member = True )
55- | Q (product__prod_type__authorized_group = True ) | Q (product__authorized_group = True ))
55+ Q (product__prod_type_id__in = Subquery (authorized_product_type_roles ))
56+ | Q (product_id__in = Subquery (authorized_product_roles ))
57+ | Q (product__prod_type_id__in = Subquery (authorized_product_type_groups ))
58+ | Q (product_id__in = Subquery (authorized_product_groups )),
59+ )
5660
5761
58- def get_authorized_endpoint_status (permission , queryset = None , user = None ):
62+ def get_authorized_endpoints_for_queryset (permission , queryset , user = None ):
63+ """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter."""
64+ if user is None :
65+ user = get_current_user ()
5966
67+ if user is None :
68+ return Endpoint .objects .none ()
69+
70+ if user .is_superuser :
71+ return queryset
72+
73+ if user_has_global_permission (user , permission ):
74+ return queryset
75+
76+ roles = get_roles_for_permission (permission )
77+
78+ # Get authorized product/product_type IDs via subqueries
79+ authorized_product_type_roles = Product_Type_Member .objects .filter (
80+ user = user , role__in = roles ,
81+ ).values ("product_type_id" )
82+
83+ authorized_product_roles = Product_Member .objects .filter (
84+ user = user , role__in = roles ,
85+ ).values ("product_id" )
86+
87+ authorized_product_type_groups = Product_Type_Group .objects .filter (
88+ group__users = user , role__in = roles ,
89+ ).values ("product_type_id" )
90+
91+ authorized_product_groups = Product_Group .objects .filter (
92+ group__users = user , role__in = roles ,
93+ ).values ("product_id" )
94+
95+ # Filter using IN with Subquery - no annotations needed
96+ return queryset .filter (
97+ Q (product__prod_type_id__in = Subquery (authorized_product_type_roles ))
98+ | Q (product_id__in = Subquery (authorized_product_roles ))
99+ | Q (product__prod_type_id__in = Subquery (authorized_product_type_groups ))
100+ | Q (product_id__in = Subquery (authorized_product_groups )),
101+ )
102+
103+
104+ # Cached: all parameters are hashable, no dynamic queryset filtering
105+ @cache_for_request
106+ def get_authorized_endpoint_status (permission , user = None ):
107+ """Cached - returns all endpoint statuses the user is authorized to see."""
60108 if user is None :
61109 user = get_current_user ()
62110
63111 if user is None :
64112 return Endpoint_Status .objects .none ()
65113
66- endpoint_status = Endpoint_Status .objects .all ().order_by ("id" ) if queryset is None else queryset
114+ endpoint_status = Endpoint_Status .objects .all ().order_by ("id" )
67115
68116 if user .is_superuser :
69117 return endpoint_status
@@ -72,27 +120,70 @@ def get_authorized_endpoint_status(permission, queryset=None, user=None):
72120 return endpoint_status
73121
74122 roles = get_roles_for_permission (permission )
123+
124+ # Get authorized product/product_type IDs via subqueries
75125 authorized_product_type_roles = Product_Type_Member .objects .filter (
76- product_type = OuterRef ( "endpoint__product__prod_type_id" ) ,
77- user = user ,
78- role__in = roles )
126+ user = user , role__in = roles ,
127+ ). values ( "product_type_id" )
128+
79129 authorized_product_roles = Product_Member .objects .filter (
80- product = OuterRef ( "endpoint__product_id" ) ,
81- user = user ,
82- role__in = roles )
130+ user = user , role__in = roles ,
131+ ). values ( "product_id" )
132+
83133 authorized_product_type_groups = Product_Type_Group .objects .filter (
84- product_type = OuterRef ( "endpoint__product__prod_type_id" ) ,
85- group__users = user ,
86- role__in = roles )
134+ group__users = user , role__in = roles ,
135+ ). values ( "product_type_id" )
136+
87137 authorized_product_groups = Product_Group .objects .filter (
88- product = OuterRef ("endpoint__product_id" ),
89- group__users = user ,
90- role__in = roles )
91- endpoint_status = endpoint_status .annotate (
92- endpoint__product__prod_type__member = Exists (authorized_product_type_roles ),
93- endpoint__product__member = Exists (authorized_product_roles ),
94- endpoint__product__prod_type__authorized_group = Exists (authorized_product_type_groups ),
95- endpoint__product__authorized_group = Exists (authorized_product_groups ))
138+ group__users = user , role__in = roles ,
139+ ).values ("product_id" )
140+
141+ # Filter using IN with Subquery - no annotations needed
96142 return endpoint_status .filter (
97- Q (endpoint__product__prod_type__member = True ) | Q (endpoint__product__member = True )
98- | Q (endpoint__product__prod_type__authorized_group = True ) | Q (endpoint__product__authorized_group = True ))
143+ Q (endpoint__product__prod_type_id__in = Subquery (authorized_product_type_roles ))
144+ | Q (endpoint__product_id__in = Subquery (authorized_product_roles ))
145+ | Q (endpoint__product__prod_type_id__in = Subquery (authorized_product_type_groups ))
146+ | Q (endpoint__product_id__in = Subquery (authorized_product_groups )),
147+ )
148+
149+
150+ def get_authorized_endpoint_status_for_queryset (permission , queryset , user = None ):
151+ """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter."""
152+ if user is None :
153+ user = get_current_user ()
154+
155+ if user is None :
156+ return Endpoint_Status .objects .none ()
157+
158+ if user .is_superuser :
159+ return queryset
160+
161+ if user_has_global_permission (user , permission ):
162+ return queryset
163+
164+ roles = get_roles_for_permission (permission )
165+
166+ # Get authorized product/product_type IDs via subqueries
167+ authorized_product_type_roles = Product_Type_Member .objects .filter (
168+ user = user , role__in = roles ,
169+ ).values ("product_type_id" )
170+
171+ authorized_product_roles = Product_Member .objects .filter (
172+ user = user , role__in = roles ,
173+ ).values ("product_id" )
174+
175+ authorized_product_type_groups = Product_Type_Group .objects .filter (
176+ group__users = user , role__in = roles ,
177+ ).values ("product_type_id" )
178+
179+ authorized_product_groups = Product_Group .objects .filter (
180+ group__users = user , role__in = roles ,
181+ ).values ("product_id" )
182+
183+ # Filter using IN with Subquery - no annotations needed
184+ return queryset .filter (
185+ Q (endpoint__product__prod_type_id__in = Subquery (authorized_product_type_roles ))
186+ | Q (endpoint__product_id__in = Subquery (authorized_product_roles ))
187+ | Q (endpoint__product__prod_type_id__in = Subquery (authorized_product_type_groups ))
188+ | Q (endpoint__product_id__in = Subquery (authorized_product_groups )),
189+ )
0 commit comments