Skip to content
19 changes: 13 additions & 6 deletions dojo/finding/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.db.models.query_utils import Q

from dojo.celery import app
from dojo.models import Finding, System_Settings
from dojo.models import Endpoint_Status, Finding, System_Settings

logger = logging.getLogger(__name__)
deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication")
Expand Down Expand Up @@ -295,12 +295,19 @@ def build_candidate_scope_queryset(test, mode="deduplication", service=None):
# Base prefetches for both modes
prefetch_list = ["endpoints", "vulnerability_id_set", "found_by"]

# Additional prefetches for reimport mode
# Additional prefetches for reimport mode: fetch only non-special endpoint statuses with their
# endpoint joined in, so endpoint_manager can read status_finding_non_special directly without
# any extra DB queries
if mode == "reimport":
prefetch_list.extend([
"status_finding",
"status_finding__endpoint",
])
prefetch_list.append(
Prefetch(
"status_finding",
queryset=Endpoint_Status.objects.exclude(
Q(false_positive=True) | Q(out_of_scope=True) | Q(risk_accepted=True),
).select_related("endpoint"),
to_attr="status_finding_non_special",
),
)

return (
queryset
Expand Down
8 changes: 2 additions & 6 deletions dojo/importers/default_reimporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,12 +766,8 @@ def process_matched_mitigated_finding(
else:
# TODO: Delete this after the move to Locations
# Reactivate mitigated endpoints that are not false positives, out of scope, or risk accepted
endpoint_statuses = existing_finding.status_finding.exclude(
Q(false_positive=True)
| Q(out_of_scope=True)
| Q(risk_accepted=True),
)
self.endpoint_manager.chunk_endpoints_and_reactivate(endpoint_statuses)
# status_finding_non_special is prefetched by build_candidate_scope_queryset
self.endpoint_manager.chunk_endpoints_and_reactivate(existing_finding.status_finding_non_special)
existing_finding.notes.add(note)
self.reactivated_items.append(existing_finding)
# The new finding is active while the existing on is mitigated. The existing finding needs to
Expand Down
5 changes: 3 additions & 2 deletions dojo/importers/endpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ def update_endpoint_status(
"""Update the list of endpoints from the new finding with the list that is in the old finding"""
# New endpoints are already added in serializers.py / views.py (see comment "# for existing findings: make sure endpoints are present or created")
# So we only need to mitigate endpoints that are no longer present
# using `.all()` will mark as mitigated also `endpoint_status` with flags `false_positive`, `out_of_scope` and `risk_accepted`. This is a known issue. This is not a bug. This is a future.
existing_finding_endpoint_status_list = existing_finding.status_finding.all()
# status_finding_non_special is prefetched by build_candidate_scope_queryset with the
# special-status exclusion and endpoint select_related already applied at the DB level
existing_finding_endpoint_status_list = existing_finding.status_finding_non_special
new_finding_endpoints_list = new_finding.unsaved_endpoints
if new_finding.is_mitigated:
# New finding is mitigated, so mitigate all old endpoints
Expand Down
16 changes: 9 additions & 7 deletions dojo/importers/location_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,19 @@ def update_location_status(
"""Update the list of locations from the new finding with the list that is in the old finding"""
# New endpoints are already added in serializers.py / views.py (see comment "# for existing findings: make sure endpoints are present or created")
# So we only need to mitigate endpoints that are no longer present
# using `.all()` will mark as mitigated also `endpoint_status` with flags `false_positive`, `out_of_scope` and `risk_accepted`. This is a known issue. This is not a bug. This is a future.

existing_location_refs: QuerySet[LocationFindingReference] = existing_finding.locations.exclude(
status__in=[
FindingLocationStatus.FalsePositive,
FindingLocationStatus.RiskAccepted,
FindingLocationStatus.OutOfScope,
],
)
if new_finding.is_mitigated:
# New finding is mitigated, so mitigate all existing location refs
self.chunk_locations_and_mitigate(existing_finding.locations.all(), user)
self.chunk_locations_and_mitigate(existing_location_refs, user)
else:
# New finding not mitigated; so, reactivate all refs
existing_location_refs: QuerySet[LocationFindingReference] = existing_finding.locations.all()

new_locations_values = [str(location) for location in new_finding.unsaved_locations]

new_locations_values = {str(location) for location in new_finding.unsaved_locations}
# Reactivate endpoints in the old finding that are in the new finding
location_refs_to_reactivate = existing_location_refs.filter(location__location_value__in=new_locations_values)
# Mitigate endpoints in the existing finding not in the new finding
Expand Down
29 changes: 23 additions & 6 deletions dojo/location/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,19 @@ def associate_with_finding(
audit_time: datetime | None = None,
) -> LocationFindingReference:
"""
Get or create a LocationFindingReference for this location and finding,
updating the status each time. Also associates the related product.
Get or create a LocationFindingReference for this location and finding.
Also associates the related product.
"""
# Check if there is an existing reference for this finding and location
# If this method is being used to set the status
if LocationFindingReference.objects.filter(
location=self,
finding=finding,
).exists():
return LocationFindingReference.objects.get(
location=self,
finding=finding,
)
# Determine the status
if status is None:
status = self.status_from_finding(finding)
Expand Down Expand Up @@ -144,10 +154,17 @@ def associate_with_product(
product: Product,
status: ProductLocationStatus | None = None,
) -> LocationProductReference:
"""
Get or create a LocationProductReference for this location and product,
updating the status each time.
"""
"""Get or create a LocationProductReference for this location and product"""
# Check if there is an existing reference for this finding and location
# If this method is being used to set the status
if LocationProductReference.objects.filter(
location=self,
product=product,
).exists():
return LocationProductReference.objects.get(
location=self,
product=product,
)
if status is None:
status = self.status_from_product(product)
# Use a transaction for safety in concurrent scenarios
Expand Down
69 changes: 69 additions & 0 deletions unittests/test_import_reimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.test.client import Client
from django.urls import reverse
from django.utils import timezone
from parameterized import parameterized

from dojo.models import Engagement, Finding, Product, Product_Type, Test, Test_Type, User

Expand Down Expand Up @@ -2003,6 +2004,74 @@ def test_import_nuclei_emptyc(self):
test_id2 = reimport0["test"]
self.assertEqual(test_id, test_id2)

@parameterized.expand(
[
("Test False Positive Status (Endpoint Status)", {"false_positive": True}, "status_finding"),
("Test Out of Scope Status (Endpoint Status)", {"out_of_scope": True}, "status_finding"),
("Test Risk Accepted Status (Endpoint Status)", {"risk_accepted": True}, "status_finding"),
("Test False Positive Status (Locations)", {"status": "FalsePositive"}, "locations"),
("Test Out of Scope Status (Locations)", {"status": "OutOfScope"}, "locations"),
("Test Risk Accepted Status (Locations)", {"status": "RiskAccepted"}, "locations"),
],
)
def test_import_reimport_endpoint_where_eps_reactivation_skips_special_status(self, label: str, special_status_fields: dict, m2m_key: str):
"""
When Findings are set to False Positive, Out of Scope, or Risk Accepted, they are not reactivated
because these statuses are often set by humans. The same needs to apply for the Endpoint Status as
they are an extension of the finding being partially mitigated.
"""
if settings.V3_FEATURE_LOCATIONS:
# TODO: Delete this after the move to Locations
if m2m_key == "status_finding":
# This test will fail for endpoint statuses with locations enabled
# return early here
return
context = {
"auditor": User.objects.get(username="admin"),
"audit_time": timezone.now(),
}
# TODO: Delete this after the move to Locations
else:
if m2m_key == "locations":
# This test will fail for locations with locations disabled
# return early here
return
context = {
"mitigated": True,
"mitigated_by": User.objects.get(username="admin"),
"mitigated_time": timezone.now(),
}
# Now start the test
with assertTestImportModelsCreated(self, imports=1, affected_findings=1, created=1):
import0 = self.import_scan_with_params(
self.gitlab_dast_file_name, self.scan_type_gitlab_dast, active=True, verified=True,
)
test_id = import0["test"]
findings = self.get_test_findings_api(test_id)
self.assert_finding_count_json(1, findings)
finding = Finding.objects.get(id=findings["results"][0]["id"])
# Get the related objects on the finding
related_obects = getattr(finding, m2m_key).all()
self.assertEqual(len(related_obects), 1)
# Update the related objects with the special status fields
related_objects_context = {**context, **special_status_fields}
related_obects.update(**related_objects_context)
# Reimport the same file
reimport0 = self.reimport_scan_with_params(
test_id, self.gitlab_dast_file_name, scan_type=self.scan_type_gitlab_dast,
)
test_id = reimport0["test"]
findings = self.get_test_findings_api(test_id)
self.assert_finding_count_json(1, findings)
finding = Finding.objects.get(id=findings["results"][0]["id"])
# Get the related objects on the finding
related_obects = getattr(finding, m2m_key).all()
self.assertEqual(len(related_obects), 1)
related_object = related_obects.first()
# Ensure the status is the same as the baseline
for key, value in related_objects_context.items():
self.assertEqual(getattr(related_object, key), value)

def test_import_reimport_endpoint_where_eps_date_is_different(self):
endpoint_count_before = self.db_endpoint_count()
endpoint_status_count_before_active = self.db_endpoint_status_count(mitigated=False)
Expand Down
12 changes: 6 additions & 6 deletions unittests/test_importers_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ def test_import_reimport_reimport_performance_pghistory_async(self):
self._import_reimport_performance(
expected_num_queries1=295,
expected_num_async_tasks1=6,
expected_num_queries2=227,
expected_num_queries2=226,
expected_num_async_tasks2=17,
expected_num_queries3=109,
expected_num_queries3=108,
expected_num_async_tasks3=16,
)

Expand All @@ -292,9 +292,9 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self):
self._import_reimport_performance(
expected_num_queries1=302,
expected_num_async_tasks1=6,
expected_num_queries2=234,
expected_num_queries2=233,
expected_num_async_tasks2=17,
expected_num_queries3=116,
expected_num_queries3=115,
expected_num_async_tasks3=16,
)

Expand All @@ -315,9 +315,9 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr
self._import_reimport_performance(
expected_num_queries1=309,
expected_num_async_tasks1=8,
expected_num_queries2=241,
expected_num_queries2=240,
expected_num_async_tasks2=19,
expected_num_queries3=120,
expected_num_queries3=119,
expected_num_async_tasks3=18,
)

Expand Down