Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OrganizationOpenPeriodsEndpoint(OrganizationEndpoint):
permission_classes = (OrganizationDetectorPermission,)

def get_group_from_detector_id(
self, detector_id: str, organization: Organization
self, request: Request, detector_id: str, organization: Organization
) -> Group | None:
validated_detector_id = to_valid_int_id("detectorId", detector_id)
try:
Expand All @@ -56,13 +56,18 @@ def get_group_from_detector_id(
if detector.project.organization_id != organization.id:
raise ValidationError({"detectorId": "Detector not found"})

if not request.access.has_project_access(detector.project):
raise ValidationError({"detectorId": "Detector not found"})

detector_group = (
DetectorGroup.objects.filter(detector=detector).order_by("-date_added").first()
)

return detector_group.group if detector_group else None

def get_group_from_group_id(self, group_id: str, organization: Organization) -> Group | None:
def get_group_from_group_id(
self, request: Request, group_id: str, organization: Organization
) -> Group | None:
validated_group_id = to_valid_int_id("groupId", group_id)
try:
group = Group.objects.select_related("project").get(id=validated_group_id)
Expand All @@ -72,6 +77,9 @@ def get_group_from_group_id(self, group_id: str, organization: Organization) ->
if group.project.organization_id != organization.id:
raise ValidationError({"groupId": "Group not found"})

if not request.access.has_project_access(group.project):
raise ValidationError({"groupId": "Group not found"})

return group

@extend_schema(
Expand Down Expand Up @@ -134,10 +142,10 @@ def get(self, request: Request, organization: Organization) -> Response:
raise ValidationError({"detail": "Must provide only one of detectorId or groupId"})

target_group: Group | None = (
self.get_group_from_detector_id(detector_id_param, organization)
self.get_group_from_detector_id(request, detector_id_param, organization)
if detector_id_param
else (
self.get_group_from_group_id(group_id_param, organization)
self.get_group_from_group_id(request, group_id_param, organization)
if group_id_param
else None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,68 @@ def test_open_periods_group_id(self) -> None:
def test_validation_error_when_missing_params(self) -> None:
self.get_error_response(*self.get_url_args(), status_code=400)

def test_group_id_returns_400_when_user_lacks_project_access(self) -> None:
self.organization.flags.allow_joinleave = False
self.organization.save()

restricted_team = self.create_team(organization=self.organization)
restricted_project = self.create_project(
organization=self.organization, teams=[restricted_team]
)
restricted_group = self.create_group(
project=restricted_project,
type=MetricIssue.type_id,
priority=PriorityLevel.LOW,
)

other_team = self.create_team(organization=self.organization)
outsider = self.create_user()
self.create_member(
user=outsider,
organization=self.organization,
role="member",
teams=[other_team],
)
self.login_as(user=outsider)

self.get_error_response(
*self.get_url_args(),
qs_params={"groupId": restricted_group.id},
status_code=400,
)

def test_detector_id_returns_400_when_user_lacks_project_access(self) -> None:
self.organization.flags.allow_joinleave = False
self.organization.save()

restricted_team = self.create_team(organization=self.organization)
restricted_project = self.create_project(
organization=self.organization, teams=[restricted_team]
)
restricted_detector = self.create_detector(project=restricted_project)
restricted_group = self.create_group(
project=restricted_project,
type=MetricIssue.type_id,
priority=PriorityLevel.LOW,
)
DetectorGroup.objects.create(detector=restricted_detector, group=restricted_group)

other_team = self.create_team(organization=self.organization)
outsider = self.create_user()
self.create_member(
user=outsider,
organization=self.organization,
role="member",
teams=[other_team],
)
self.login_as(user=outsider)

self.get_error_response(
*self.get_url_args(),
qs_params={"detectorId": restricted_detector.id},
status_code=400,
)

def test_open_periods_resolved_group(self) -> None:
self.group.status = GroupStatus.RESOLVED
self.group.save()
Expand Down
Loading