- """.format(
- p1.get_admin_url(), p1.filename, p1.id
- )
+ """.format(p1.get_admin_url(), p1.filename, p1.id)
self.assertContains(response, expected, html=True)
self.assertIsNone(p1.usage_policy)
diff --git a/component_catalog/tests/test_views.py b/component_catalog/tests/test_views.py
index 403c1d9f..86791991 100644
--- a/component_catalog/tests/test_views.py
+++ b/component_catalog/tests/test_views.py
@@ -4133,9 +4133,7 @@ def test_component_catalog_list_view_request_links(self):
expected = """
R
- """.format(
- self.component1.get_absolute_url()
- )
+ """.format(self.component1.get_absolute_url())
self.assertContains(response, expected, html=True)
def test_component_list_multi_send_about_files_view(self):
@@ -4578,7 +4576,7 @@ def test_related_label_has_correct_target(self):
reverse("grp_related_lookup"), self.c1.pk
)
response = self.client.get(url)
- expected = [{"label": "{0}".format(self.c1), "safe": False, "value": str(self.c1.pk)}]
+ expected = [{"label": str(self.c1), "safe": False, "value": str(self.c1.pk)}]
self.assertEqual(expected, json.loads(response.content))
def test_reference_dataspace_users_access_another_dataspaces_through_grappelli_lookup(self):
@@ -4591,7 +4589,7 @@ def test_reference_dataspace_users_access_another_dataspaces_through_grappelli_l
response = self.client.get(url)
expected = [
{
- "label": "{0}".format(self.other_component),
+ "label": str(self.other_component),
"safe": False,
"value": str(self.other_component.pk),
}
diff --git a/component_catalog/views.py b/component_catalog/views.py
index 22ed0fc2..87b85dca 100644
--- a/component_catalog/views.py
+++ b/component_catalog/views.py
@@ -2337,7 +2337,7 @@ def scan_detected_package_fields(self, key_files_packages):
for label, scan_field in ScanCodeIO.SCAN_PACKAGE_FIELD:
if value := self.detected_package_data.get(scan_field):
if isinstance(value, list):
- value = format_html(" ".join(([escape(entry) for entry in value])))
+ value = format_html(" ".join([escape(entry) for entry in value]))
else:
value = escape(value)
detected_package_fields.append((label, value))
diff --git a/dejacode/settings.py b/dejacode/settings.py
index f14bc897..53f1bd80 100644
--- a/dejacode/settings.py
+++ b/dejacode/settings.py
@@ -13,6 +13,9 @@
from pathlib import Path
import environ
+import ldap
+from django_auth_ldap.config import GroupOfNamesType
+from django_auth_ldap.config import LDAPSearch
# The home directory of the dejacode user that owns the installation.
PROJECT_DIR = environ.Path(__file__) - 1
@@ -685,10 +688,6 @@ def get_fake_redis_connection(config, use_strict_redis):
# LDAP Configuration
-import ldap
-from django_auth_ldap.config import GroupOfNamesType
-from django_auth_ldap.config import LDAPSearch
-
# This authentication backend enables users to authenticate against an
# LDAP server.
# To enable LDAP Authentication, first, set the following in your .env
diff --git a/dejacode_toolkit/utils.py b/dejacode_toolkit/utils.py
index f14de450..30517103 100644
--- a/dejacode_toolkit/utils.py
+++ b/dejacode_toolkit/utils.py
@@ -11,7 +11,7 @@
def sha1(content):
"""Return the sha1 hash of the given content."""
- return hashlib.sha1(content).hexdigest() # nosec
+ return hashlib.sha1(content, usedforsecurity=False).hexdigest()
def sha256(content):
@@ -26,4 +26,4 @@ def sha512(content):
def md5(content):
"""Return the md5 hash of the given content."""
- return hashlib.md5(content).hexdigest() # nosec
+ return hashlib.md5(content, usedforsecurity=False).hexdigest()
diff --git a/dje/client_data.py b/dje/client_data.py
index f283362e..37b920bd 100644
--- a/dje/client_data.py
+++ b/dje/client_data.py
@@ -12,11 +12,11 @@ def add_client_data(request, **kwargs):
Set values on the request, to be available in the JavaScript client data object.
On the client side, the values are accessible through ``NEXB.client_data``.
"""
- if not hasattr(request, 'client_data'):
+ if not hasattr(request, "client_data"):
request.client_data = {}
request.client_data.update(kwargs)
def client_data_context_processor(request):
- client_data = getattr(request, 'client_data', {})
- return {'client_data': client_data}
+ client_data = getattr(request, "client_data", {})
+ return {"client_data": client_data}
diff --git a/dje/management/commands/checkdata.py b/dje/management/commands/checkdata.py
index 7c5d2d50..4de45e7e 100644
--- a/dje/management/commands/checkdata.py
+++ b/dje/management/commands/checkdata.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
@@ -17,33 +16,44 @@
class Command(BaseCommand):
- help = 'Checks the given Dataspace for potential data problems.'
+ help = "Checks the given Dataspace for potential data problems."
requires_system_checks = []
def add_arguments(self, parser):
- parser.add_argument('dataspace', nargs='?', help='Name of the Dataspace.')
- parser.add_argument('--tag', '-t', action='append', dest='tags',
- help='Run only checks labeled with given tag.')
- parser.add_argument('--list-tags', action='store_true', dest='list_tags',
- help='List available tags.')
- parser.add_argument('--all-dataspaces', action='store_true', dest='all_dataspaces',
- help='Run the checks on all Dataspaces.')
+ parser.add_argument("dataspace", nargs="?", help="Name of the Dataspace.")
+ parser.add_argument(
+ "--tag",
+ "-t",
+ action="append",
+ dest="tags",
+ help="Run only checks labeled with given tag.",
+ )
+ parser.add_argument(
+ "--list-tags", action="store_true", dest="list_tags", help="List available tags."
+ )
+ parser.add_argument(
+ "--all-dataspaces",
+ action="store_true",
+ dest="all_dataspaces",
+ help="Run the checks on all Dataspaces.",
+ )
def handle(self, *args, **options):
- if options['list_tags']:
- self.stdout.write('\n'.join(sorted(registry.tags_available())))
+ if options["list_tags"]:
+ self.stdout.write("\n".join(sorted(registry.tags_available())))
return
- dataspace = options.get('dataspace')
- tags = options.get('tags')
+ dataspace = options.get("dataspace")
+ tags = options.get("tags")
- if not dataspace and not options['all_dataspaces']:
- raise CommandError('Enter a Dataspace.')
+ if not dataspace and not options["all_dataspaces"]:
+ raise CommandError("Enter a Dataspace.")
- special_tags = ['reporting'], ['expression']
- if options['all_dataspaces'] and tags not in special_tags:
- raise CommandError('--all-dataspaces only usable with `--tag reporting` or '
- '`--tag expression`')
+ special_tags = ["reporting"], ["expression"]
+ if options["all_dataspaces"] and tags not in special_tags:
+ raise CommandError(
+ "--all-dataspaces only usable with `--tag reporting` or " "`--tag expression`"
+ )
app_configs = {}
@@ -51,11 +61,11 @@ def handle(self, *args, **options):
try:
dataspace = Dataspace.objects.get(name=dataspace)
except Dataspace.DoesNotExist:
- raise CommandError(f'The Dataspace {dataspace} does not exit.')
+ raise CommandError(f"The Dataspace {dataspace} does not exit.")
# Using `app_configs` as a workaround to provide the dataspace to
# the check commands.
- app_configs = {'dataspace': dataspace}
+ app_configs = {"dataspace": dataspace}
if tags:
try:
@@ -65,7 +75,7 @@ def handle(self, *args, **options):
else:
raise CommandError(f'No data check for the "{invalid_tag}" tag.')
else:
- tags = ['data', 'reporting', 'expression'] # default tags
+ tags = ["data", "reporting", "expression"] # default tags
self.check(
app_configs=app_configs,
diff --git a/dje/management/commands/checkmigrations.py b/dje/management/commands/checkmigrations.py
index 56f512e7..46faa53b 100644
--- a/dje/management/commands/checkmigrations.py
+++ b/dje/management/commands/checkmigrations.py
@@ -38,10 +38,7 @@ def add_arguments(self, parser):
parser.add_argument(
"--database",
default=DEFAULT_DB_ALIAS,
- help=(
- 'Nominates a database to synchronize. Defaults to the "default" '
- "database."
- ),
+ help='Nominates a database to synchronize. Defaults to the "default" ' "database.",
)
def handle(self, *args, **options):
diff --git a/dje/management/commands/checks.py b/dje/management/commands/checks.py
index cd2c3ca0..8d355344 100644
--- a/dje/management/commands/checks.py
+++ b/dje/management/commands/checks.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
@@ -441,7 +440,7 @@ def check_for_fields_value_inconsistencies(app_configs, **kwargs):
group_list = list(group)
for field_name in fields_to_check:
- if type(tuple()) == type(field_name):
+ if isinstance(field_name, tuple):
field_name, method = field_name
inconsistencies = {str(getattr(instance, method)()) for instance in group_list}
else:
diff --git a/dje/management/commands/clonedataset.py b/dje/management/commands/clonedataset.py
index 04213bad..b8e06573 100644
--- a/dje/management/commands/clonedataset.py
+++ b/dje/management/commands/clonedataset.py
@@ -28,77 +28,89 @@
class Command(BaseCommand):
- help = 'Copy all the data for the reference dataspace into the target one.'
+ help = "Copy all the data for the reference dataspace into the target one."
def add_arguments(self, parser):
- parser.add_argument('reference', help='Name of the reference Dataspace.')
- parser.add_argument('target', help='Name of the target Dataspace.')
- parser.add_argument('username', help='Your username.')
- parser.add_argument('--license_library', action='store_true', dest='license_library',
- default=False,
- help='Limit the clone to License Library objects.')
- parser.add_argument('--product_portfolio', action='store_true', dest='product_portfolio',
- default=False,
- help='Include Product Portfolio models.'
- 'WARNING: This should only be used when cloning a Template '
- 'Dataspace')
+ parser.add_argument("reference", help="Name of the reference Dataspace.")
+ parser.add_argument("target", help="Name of the target Dataspace.")
+ parser.add_argument("username", help="Your username.")
+ parser.add_argument(
+ "--license_library",
+ action="store_true",
+ dest="license_library",
+ default=False,
+ help="Limit the clone to License Library objects.",
+ )
+ parser.add_argument(
+ "--product_portfolio",
+ action="store_true",
+ dest="product_portfolio",
+ default=False,
+ help="Include Product Portfolio models."
+ "WARNING: This should only be used when cloning a Template "
+ "Dataspace",
+ )
def handle(self, *args, **options):
try:
- reference = Dataspace.objects.get(name=options.get('reference'))
- target = Dataspace.objects.get(name=options.get('target'))
+ reference = Dataspace.objects.get(name=options.get("reference"))
+ target = Dataspace.objects.get(name=options.get("target"))
except ObjectDoesNotExist:
- raise CommandError('One of the Dataspace does not exist.')
+ raise CommandError("One of the Dataspace does not exist.")
if reference == target:
- raise CommandError('reference and target must be different.')
+ raise CommandError("reference and target must be different.")
try:
- user = get_user_model().objects.get(username=options.get('username'))
+ user = get_user_model().objects.get(username=options.get("username"))
except ObjectDoesNotExist:
- raise CommandError('The given username does not exist.')
+ raise CommandError("The given username does not exist.")
- if options.get('license_library'):
+ if options.get("license_library"):
models = LICENSE_LIBRARY_MODELS[:]
else:
models = ALL_MODELS_NO_PP[:]
models.extend(REPORTING_MODELS)
models.extend(POLICY_MODELS)
- models.extend([
- ExternalSource,
- Priority,
- RequestTemplate,
- Question,
- ])
+ models.extend(
+ [
+ ExternalSource,
+ Priority,
+ RequestTemplate,
+ Question,
+ ]
+ )
# The following models cannot be added until the copy supports secured manager.
# ProductComponent, ProductAssignedLicense, ProductComponentAssignedLicense
# Also, Product.objects always Return None if no user is provided.
- if options.get('product_portfolio'):
- models.extend([
- ProductStatus,
- ProductRelationStatus,
- ])
+ if options.get("product_portfolio"):
+ models.extend(
+ [
+ ProductStatus,
+ ProductRelationStatus,
+ ]
+ )
# Explicit empty exclude dict for each Models to enforce an exact copy
- copy_kwargs = {'exclude': {}}
+ copy_kwargs = {"exclude": {}}
for model in models:
- copy_kwargs['exclude'][model] = []
+ copy_kwargs["exclude"][model] = []
# Do not trigger m2m or o2m copy cascade, respect the copy order from
# the `models` list.
- copy_kwargs['skip_m2m_and_o2m'] = True
+ copy_kwargs["skip_m2m_and_o2m"] = True
errors = []
for model in models:
- if model.__name__ == 'Component':
+ if model.__name__ == "Component":
reference_qs = get_component_limited_qs(dataspace=reference)
else:
reference_qs = model.objects.scope(reference)
- msg = '{}: copying {} objects...'.format(model.__name__, reference_qs.count())
+ msg = "{}: copying {} objects...".format(model.__name__, reference_qs.count())
self.stdout.write(msg)
for instance in reference_qs:
@@ -110,11 +122,11 @@ def handle(self, *args, **options):
# Verification
target_qs = model.objects.scope(target)
if len(reference_qs) == target_qs.count():
- self.stdout.write('Copy completed.')
+ self.stdout.write("Copy completed.")
else:
- self.stdout.write('Errors!')
+ self.stdout.write("Errors!")
- self.stdout.write('Data copy completed.')
+ self.stdout.write("Data copy completed.")
if errors:
- self.stderr.write('\n\n'.join(errors))
+ self.stderr.write("\n\n".join(errors))
diff --git a/dje/management/commands/dumpdataset.py b/dje/management/commands/dumpdataset.py
index 1c8620d5..d4cd4448 100644
--- a/dje/management/commands/dumpdataset.py
+++ b/dje/management/commands/dumpdataset.py
@@ -40,7 +40,7 @@
class ExcludeFieldsSerializer(Serializer):
exclude_fields = [
- 'request_count',
+ "request_count",
]
def handle_field(self, obj, field):
@@ -54,88 +54,134 @@ def handle_field(self, obj, field):
class Command(BaseCommand):
- help = ('Output the contents of the all DejaCode data for the '
- 'given Dataspace as a fixture.')
+ help = "Output the contents of the all DejaCode data for the " "given Dataspace as a fixture."
def add_arguments(self, parser):
- parser.add_argument('dataspace_name', help='Name of the Dataspace.')
+ parser.add_argument("dataspace_name", help="Name of the Dataspace.")
parser.add_argument(
- '--dataspace', action='store_true', dest='dataspace', default=False,
- help='Only the given Dataspace will be dumped.')
+ "--dataspace",
+ action="store_true",
+ dest="dataspace",
+ default=False,
+ help="Only the given Dataspace will be dumped.",
+ )
parser.add_argument(
- '--user', action='store_true', dest='user', default=False,
- help='Only Dataspace and User Models will be dumped.')
+ "--user",
+ action="store_true",
+ dest="user",
+ default=False,
+ help="Only Dataspace and User Models will be dumped.",
+ )
parser.add_argument(
- '--external', action='store_true', dest='external', default=False,
- help='Only ExternalSource instances will be dumped.')
+ "--external",
+ action="store_true",
+ dest="external",
+ default=False,
+ help="Only ExternalSource instances will be dumped.",
+ )
parser.add_argument(
- '--policy', action='store_true', dest='policy', default=False,
- help='Only UsagePolicy instances will be dumped.')
+ "--policy",
+ action="store_true",
+ dest="policy",
+ default=False,
+ help="Only UsagePolicy instances will be dumped.",
+ )
parser.add_argument(
- '--organization', action='store_true', dest='organization', default=False,
- help='Only Organization Models will be dumped.')
+ "--organization",
+ action="store_true",
+ dest="organization",
+ default=False,
+ help="Only Organization Models will be dumped.",
+ )
parser.add_argument(
- '--license_library', action='store_true', dest='license_library', default=False,
- help='Only License Library Models will be dumped.')
+ "--license_library",
+ action="store_true",
+ dest="license_library",
+ default=False,
+ help="Only License Library Models will be dumped.",
+ )
parser.add_argument(
- '--component_catalog', action='store_true', dest='component_catalog', default=False,
- help='Only Component Catalog Models with some extra filtering will be dumped.')
+ "--component_catalog",
+ action="store_true",
+ dest="component_catalog",
+ default=False,
+ help="Only Component Catalog Models with some extra filtering will be dumped.",
+ )
parser.add_argument(
- '--workflow', action='store_true', dest='workflow', default=False,
- help='Only Workflow Models will be dumped.')
+ "--workflow",
+ action="store_true",
+ dest="workflow",
+ default=False,
+ help="Only Workflow Models will be dumped.",
+ )
parser.add_argument(
- '--reporting', action='store_true', dest='reporting', default=False,
- help='Only Reporting Models will be dumped.')
+ "--reporting",
+ action="store_true",
+ dest="reporting",
+ default=False,
+ help="Only Reporting Models will be dumped.",
+ )
parser.add_argument(
- '--product_portfolio', action='store_true', dest='product_portfolio', default=False,
- help='Only Product Models will be dumped.')
+ "--product_portfolio",
+ action="store_true",
+ dest="product_portfolio",
+ default=False,
+ help="Only Product Models will be dumped.",
+ )
def handle(self, *args, **options):
- dataspace_name = options.get('dataspace_name')
+ dataspace_name = options.get("dataspace_name")
try:
dataspace = Dataspace.objects.get(name=dataspace_name)
except Dataspace.DoesNotExist:
- raise CommandError(f'The Dataspace {dataspace_name} does not exit.')
+ raise CommandError(f"The Dataspace {dataspace_name} does not exit.")
models = []
data = []
- if options.get('dataspace'):
+ if options.get("dataspace"):
data = [dataspace]
- if options.get('user'):
+ if options.get("user"):
data = [dataspace] # includes the dataspace
data += list(get_user_model().objects.scope(dataspace))
- if options.get('external'):
+ if options.get("external"):
models = [ExternalSource]
- if options.get('policy'):
+ if options.get("policy"):
models = POLICY_MODELS[:]
- if options.get('organization'):
+ if options.get("organization"):
owner_limited_qs = get_owner_limited_qs(dataspace)
data += list(owner_limited_qs)
- subowner_limited_qs = Subowner.objects\
- .filter(
+ subowner_limited_qs = (
+ Subowner.objects.filter(
parent__in=owner_limited_qs,
child__in=owner_limited_qs,
- ).select_related(
- 'parent__dataspace',
- 'child__dataspace',
- ).distinct()
+ )
+ .select_related(
+ "parent__dataspace",
+ "child__dataspace",
+ )
+ .distinct()
+ )
data += list(subowner_limited_qs)
- if options.get('license_library'):
+ if options.get("license_library"):
models = LICENSE_LIBRARY_MODELS[:]
- if options.get('product_portfolio'):
- component_qs = Component.objects.scope(dataspace)\
- .filter(productcomponents__isnull=False)\
+ if options.get("product_portfolio"):
+ component_qs = (
+ Component.objects.scope(dataspace)
+ .filter(productcomponents__isnull=False)
.exclude(id__in=get_component_limited_qs(dataspace))
- owner_qs = Owner.objects.scope(dataspace)\
- .filter(component__in=component_qs)\
+ )
+ owner_qs = (
+ Owner.objects.scope(dataspace)
+ .filter(component__in=component_qs)
.exclude(id__in=get_owner_limited_qs(dataspace))
+ )
package_qs = Package.objects.scope(dataspace).filter(productpackages__isnull=False)
data += list(owner_qs)
@@ -144,21 +190,23 @@ def handle(self, *args, **options):
models = PRODUCT_PORTFOLIO_MODELS[:]
- if options.get('component_catalog'):
+ if options.get("component_catalog"):
data += list(ComponentType.objects.scope(dataspace))
data += list(ComponentStatus.objects.scope(dataspace))
data += list(AcceptableLinkage.objects.scope(dataspace))
components = get_component_limited_qs(dataspace)
- component_ids = list(components.values_list('id', flat=True))
+ component_ids = list(components.values_list("id", flat=True))
data += list(components)
- subcomponents = Subcomponent.objects.filter(
- parent_id__in=component_ids, child_id__in=component_ids
- ).select_related(
- 'parent__dataspace',
- 'child__dataspace',
- ).distinct()
+ subcomponents = (
+ Subcomponent.objects.filter(parent_id__in=component_ids, child_id__in=component_ids)
+ .select_related(
+ "parent__dataspace",
+ "child__dataspace",
+ )
+ .distinct()
+ )
data += list(subcomponents)
# From Django docs:
@@ -167,33 +215,39 @@ def handle(self, *args, **options):
# The handle_assigned_licenses() method is not called so we need to dump
# ComponentAssignedLicense and SubcomponentAssignedLicense models data
data += list(
- ComponentAssignedLicense.objects
- .filter(component__id__in=component_ids)
- .select_related()
+ ComponentAssignedLicense.objects.filter(
+ component__id__in=component_ids
+ ).select_related()
)
data += list(
- SubcomponentAssignedLicense.objects
- .filter(subcomponent__in=subcomponents)
- .select_related()
+ SubcomponentAssignedLicense.objects.filter(
+ subcomponent__in=subcomponents
+ ).select_related()
)
data += list(ComponentKeyword.objects.scope(dataspace))
packages = (
- Package.objects
- .filter(componentassignedpackage__component__id__in=component_ids)
- .select_related().distinct()
+ Package.objects.filter(componentassignedpackage__component__id__in=component_ids)
+ .select_related()
+ .distinct()
)
data += list(packages)
- data += list(ComponentAssignedPackage.objects.filter(
- component__id__in=component_ids).select_related().distinct())
- data += list(PackageAssignedLicense.objects.filter(
- package__in=packages).select_related().distinct())
+ data += list(
+ ComponentAssignedPackage.objects.filter(component__id__in=component_ids)
+ .select_related()
+ .distinct()
+ )
+ data += list(
+ PackageAssignedLicense.objects.filter(package__in=packages)
+ .select_related()
+ .distinct()
+ )
- if options.get('workflow'):
+ if options.get("workflow"):
models = [RequestTemplate, Question, Priority]
- if options.get('reporting'):
+ if options.get("reporting"):
models = REPORTING_MODELS[:]
for model_class in models:
diff --git a/dje/management/commands/dumpinitdata.py b/dje/management/commands/dumpinitdata.py
index 0153070a..c9199305 100644
--- a/dje/management/commands/dumpinitdata.py
+++ b/dje/management/commands/dumpinitdata.py
@@ -29,14 +29,14 @@
class Command(BaseCommand):
def add_arguments(self, parser):
- parser.add_argument('dataspace_name', help='Name of the Dataspace.')
+ parser.add_argument("dataspace_name", help="Name of the Dataspace.")
def handle(self, *args, **options):
- dataspace_name = options.get('dataspace_name')
+ dataspace_name = options.get("dataspace_name")
try:
dataspace = Dataspace.objects.get(name=dataspace_name)
except Dataspace.DoesNotExist:
- raise CommandError(f'The Dataspace {dataspace_name} does not exit.')
+ raise CommandError(f"The Dataspace {dataspace_name} does not exit.")
models = []
data = [
diff --git a/dje/management/commands/flushdataset.py b/dje/management/commands/flushdataset.py
index de2f6b80..4ccf36de 100644
--- a/dje/management/commands/flushdataset.py
+++ b/dje/management/commands/flushdataset.py
@@ -21,18 +21,20 @@
class Command(DataspacedCommand):
- help = ('Removes all the data related to a given Dataspace from '
- 'the database. '
- 'Use the option --keep-users to keep the Users.')
+ help = (
+ "Removes all the data related to a given Dataspace from "
+ "the database. "
+ "Use the option --keep-users to keep the Users."
+ )
def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument(
- '--keep-users',
- action='store_true',
- dest='keep_users',
+ "--keep-users",
+ action="store_true",
+ dest="keep_users",
default=False,
- help='Keeps the Users and the Dataspace.',
+ help="Keeps the Users and the Dataspace.",
)
def handle(self, *args, **options):
@@ -44,14 +46,17 @@ def handle(self, *args, **options):
models += REPORTING_MODELS
models.reverse() # The order matters, see the protected FK
- models.extend([
- # WARNING: The following Policy models order matters and needs to be after main models
- AssociatedPolicy,
- UsagePolicy,
- ExternalReference,
- ExternalSource,
- Webhook,
- ])
+ models.extend(
+ [
+ # WARNING: The following Policy models order matters and needs to be after
+ # main models.
+ AssociatedPolicy,
+ UsagePolicy,
+ ExternalReference,
+ ExternalSource,
+ Webhook,
+ ]
+ )
# Clear the associated_product_relation_status FK values so the
# ProductRelationStatus model can be flushed before the UsagePolicy model.
@@ -60,12 +65,12 @@ def handle(self, *args, **options):
for model_class in models:
qs = get_unsecured_manager(model_class).filter(dataspace=dataspace)
- if options['verbosity'] > 1:
- self.stdout.write(f'Deleting {qs.count()} {model_class.__name__}...')
+ if options["verbosity"] > 1:
+ self.stdout.write(f"Deleting {qs.count()} {model_class.__name__}...")
qs.delete()
# This is the case where --keep-users is NOT given
- if not options['keep_users']:
+ if not options["keep_users"]:
get_user_model().objects.scope(dataspace).delete()
dataspace.delete()
diff --git a/dje/registration.py b/dje/registration.py
index 24fd9de1..dbee55d3 100644
--- a/dje/registration.py
+++ b/dje/registration.py
@@ -126,9 +126,8 @@ def __init__(self, *args, **kwargs):
self.fields["hcaptcha"].label = ""
- self.fields["updates_email_notification"].label = (
- "Receive updates on DejaCode features and news"
- )
+ notification_label = "Receive updates on DejaCode features and news"
+ self.fields["updates_email_notification"].label = notification_label
for field in self.fields.values():
field.help_text = None
diff --git a/dje/tests/test_access.py b/dje/tests/test_access.py
index b99a9ae0..f440c759 100644
--- a/dje/tests/test_access.py
+++ b/dje/tests/test_access.py
@@ -86,9 +86,7 @@ def __call__(self, model, _quantity=None, make_m2m=False, **attrs):
return self.make(model, _quantity, make_m2m, **attrs)
def make(self, model, _quantity=None, make_m2m=False, **attrs):
- """
- Create persisted instances from a given model and associated models.
- """
+ """Create persisted instances from a given model and associated models."""
if is_dataspace_related(model):
attrs["dataspace"] = self.dataspace
@@ -130,9 +128,7 @@ def required(self):
def collect_views():
- """
- Yield all view and url data collected from the root urlconf.
- """
+ """Yield all view and url data collected from the root urlconf."""
urlconf = __import__(settings.ROOT_URLCONF, {}, {}, [""])
views_from_urls = extract_views_from_urlpatterns(urlconf.urlpatterns)
for func, regex, namespace, url_name in views_from_urls:
@@ -154,21 +150,17 @@ def is_var(segment):
Return the variable name if a url segment is a variable.
Return False or None otherwise.
"""
- if segment.startswith(("<")) and segment.endswith(">"):
+ if segment.startswith("<") and segment.endswith(">"):
return segment.strip("<>")
def to_segments(url_path):
- """
- Return a list o segments from a url path
- """
+ """Return a list o segments from a url path"""
return url_path.strip("/").split("/")
def to_urlpath(segments):
- """
- Return a URL path from list of segments
- """
+ """Return a URL path from list of segments"""
return "/" + "/".join(segments) + "/"
@@ -233,16 +225,12 @@ def model_for_url(url_path):
def checkable_url(view_name, *args, **kwargs):
- """
- Return a URL suitable for testing given a view name and crafted args.
- """
+ """Return a URL suitable for testing given a view name and crafted args."""
return reverse(view_name, args=args, **kwargs)
def build_model(ds, class_):
- """
- Build a random model in dataspace ds. Return the saved model object.
- """
+ """Build a random model in dataspace ds. Return the saved model object."""
maker = ModelMaker(ds)
return maker(class_)
diff --git a/dje/tests/test_command.py b/dje/tests/test_command.py
index 3958f448..14c2763d 100644
--- a/dje/tests/test_command.py
+++ b/dje/tests/test_command.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_external_reference.py b/dje/tests/test_external_reference.py
index a76cfd0b..22f5a284 100644
--- a/dje/tests/test_external_reference.py
+++ b/dje/tests/test_external_reference.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_notification.py b/dje/tests/test_notification.py
index cb42ae40..e351e010 100644
--- a/dje/tests/test_notification.py
+++ b/dje/tests/test_notification.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_registration.py b/dje/tests/test_registration.py
index 7eb9d629..6f38278b 100644
--- a/dje/tests/test_registration.py
+++ b/dje/tests/test_registration.py
@@ -28,9 +28,7 @@
ADMINS=[("admin", "admin@nexb.com")],
)
class DejaCodeUserRegistrationTestCase(TestCase):
- """
- Tests for the dejacode.com registration workflow.
- """
+ """Tests for the dejacode.com registration workflow."""
def setUp(self):
refresh_url_cache()
diff --git a/dje/tests/test_urn.py b/dje/tests/test_urn.py
index 16daa6b6..a150b2ef 100644
--- a/dje/tests/test_urn.py
+++ b/dje/tests/test_urn.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_user.py b/dje/tests/test_user.py
index a9d72731..4fe66020 100644
--- a/dje/tests/test_user.py
+++ b/dje/tests/test_user.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_utils.py b/dje/tests/test_utils.py
index fda56b60..e63675d4 100644
--- a/dje/tests/test_utils.py
+++ b/dje/tests/test_utils.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_validators.py b/dje/tests/test_validators.py
index ac177ffe..846e4c7b 100644
--- a/dje/tests/test_validators.py
+++ b/dje/tests/test_validators.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/test_views.py b/dje/tests/test_views.py
index 0eb94281..20671ac1 100644
--- a/dje/tests/test_views.py
+++ b/dje/tests/test_views.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/tests/tests.py b/dje/tests/tests.py
index dbd154f9..57693ad8 100644
--- a/dje/tests/tests.py
+++ b/dje/tests/tests.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/dje/views.py b/dje/views.py
index 42fd1f7b..2fd38844 100644
--- a/dje/views.py
+++ b/dje/views.py
@@ -1219,6 +1219,150 @@ def get_context_data(self, **kwargs):
return context
+def object_copy_get(request, m2m_formset_class):
+ user_dataspace = request.user.dataspace
+ model_class = get_model_class_from_path(request.path)
+ requested_ids = request.GET.get("ids", "")
+
+ # In case the view is not requested with the proper parameters
+ if not requested_ids:
+ raise Http404
+
+ opts = model_class._meta
+ preserved_filters = get_preserved_filters(
+ request, model_class, parameter_name="_changelist_filters"
+ )
+ changelist_url = reverse(f"admin:{opts.app_label}_{opts.model_name}_changelist")
+ redirect_url = add_preserved_filters(
+ {"preserved_filters": preserved_filters, "opts": opts}, changelist_url
+ )
+
+ # Ids of objects to be copied
+ ids = requested_ids.split(",")
+
+ # Limit the copy to 100 Objects at the time, as it's the number of
+ # Objects we display per page, default value for the list_per_page
+ # of the ModelAdmin
+ COPY_NB_OBJECT_LIMIT = 100
+ if len(ids) > COPY_NB_OBJECT_LIMIT:
+ msg = (
+ f"Maximum of objects that can be copied at once is "
+ f"limited to {COPY_NB_OBJECT_LIMIT} (by system-wide settings)"
+ )
+ messages.warning(request, msg)
+ return redirect(redirect_url)
+
+ # Let's find the Source Dataspace using the first id
+ # This block will redirect the user to the list if the
+ # first id of the list do not exist
+ try:
+ source_object = model_class.objects.get(id=ids[0])
+ except ObjectDoesNotExist:
+ return redirect(redirect_url)
+
+ # No custom permission for 'copy', we use the 'add' one
+ if not has_permission(source_object, request.user, "add"):
+ messages.error(request, _("Sorry you do not have rights to execute this action"))
+ return redirect(redirect_url)
+
+ source = source_object.dataspace
+ # As a non-Reference Dataspace User, I can only use the Reference
+ # data as the source and my Dataspace as the target
+ # As a Reference User, I can choose both, source and target.
+ if user_dataspace.is_reference:
+ # The following is only used when the User is in the Reference
+ targets_from_request = request.GET.getlist("target")
+ # If the target has been set, then we can continue
+ if targets_from_request:
+ data = {"target": targets_from_request}
+ choice_form = MultiDataspaceChoiceForm(source, request.user, data=data)
+ if not choice_form.is_valid():
+ return redirect(redirect_url)
+ targets = choice_form.cleaned_data["target"]
+ # else, we build a form to offer the choice to the user,
+ # choices do not include the current source
+ else:
+ initial = {
+ "ids": requested_ids,
+ "_changelist_filters": dict(parse_qsl(preserved_filters)).get(
+ "_changelist_filters"
+ ),
+ }
+ is_popup = request.GET.get(IS_POPUP_VAR, False)
+ if is_popup:
+ initial["_popup"] = is_popup
+ choice_form = MultiDataspaceChoiceForm(source, request.user, initial=initial)
+ return render(
+ request,
+ "admin/object_copy_dataspace_form.html",
+ {
+ "form": choice_form,
+ "opts": opts,
+ "is_popup": is_popup,
+ "preserved_filters": preserved_filters,
+ },
+ )
+ elif not source.is_reference:
+ # As a non-Reference User my only "external" source of data allowed
+ # is the Reference Dataspace
+ return redirect(redirect_url)
+ else:
+ targets = [user_dataspace]
+
+ # At this stage, we have the Source and Target Dataspaces
+ # Let's see which objects are eligible for copy, or offer the update
+ copy_candidates = []
+ update_candidates = []
+
+ # Building a QuerySet based on the given ids, if an non-authorized or
+ # non-existing id was injected it will be ignored thanks to the
+ # id__in and the dataspace scoping.
+ queryset = model_class.objects.scope(source).filter(id__in=ids)
+
+ for target in targets:
+ for source_instance in queryset:
+ matched_object = get_object_in(source_instance, target)
+ if matched_object:
+ # Inject the source_instance for future usage in the template
+ update_candidates.append((matched_object, source_instance))
+ else:
+ copy_candidates.append((source_instance, target))
+
+ initial = {
+ "source": source,
+ "targets": targets,
+ "ct": ContentType.objects.get_for_model(model_class).id,
+ }
+ form = CopyConfigurationForm(request.user, initial=initial)
+
+ # Many2Many exclude on copy/update
+ m2m_initial = [
+ {"ct": ContentType.objects.get_for_model(m2m_field.remote_field.through).id}
+ for m2m_field in model_class._meta.many_to_many
+ ]
+
+ # Also handle relational fields if explicitly declared on the Model using the
+ # get_extra_relational_fields method.
+ for field_name in model_class.get_extra_relational_fields():
+ related_model = model_class._meta.get_field(field_name).related_model
+ if related_model().get_exclude_candidates_fields():
+ ct = ContentType.objects.get_for_model(related_model)
+ m2m_initial.append({"ct": ct.id})
+
+ return render(
+ request,
+ "admin/object_copy.html",
+ {
+ "copy_candidates": copy_candidates,
+ "update_candidates": update_candidates,
+ "form": form,
+ "m2m_formset": m2m_formset_class(initial=m2m_initial),
+ "opts": source_object._meta,
+ "preserved_filters": preserved_filters,
+ },
+ )
+
+
@login_required
def object_copy_view(request):
"""
@@ -1232,164 +1376,22 @@ def object_copy_view(request):
This result as an extra step of presenting the target Dataspace list of
choices.
"""
- user_dataspace = request.user.dataspace
# Declared here as it required in GET and POST cases.
- M2MConfigurationFormSet = formset_factory(
+ m2m_formset_class = formset_factory(
wraps(M2MCopyConfigurationForm)(partial(M2MCopyConfigurationForm, user=request.user)),
extra=0,
)
- model_class = get_model_class_from_path(request.path)
-
# Default entry point of the view, requested using a GET
# At that stage, we are only looking at what the User requested,
# making sure everything is in order, present him what is going to
# happens and ask for his confirmation.
if request.method == "GET":
- requested_ids = request.GET.get("ids", "")
-
- # In case the view is not requested with the proper parameters
- if not requested_ids:
- raise Http404
-
- opts = model_class._meta
- preserved_filters = get_preserved_filters(
- request, model_class, parameter_name="_changelist_filters"
- )
- changelist_url = reverse(f"admin:{opts.app_label}_{opts.model_name}_changelist")
- redirect_url = add_preserved_filters(
- {"preserved_filters": preserved_filters, "opts": opts}, changelist_url
- )
-
- # Ids of objects to be copied
- ids = requested_ids.split(",")
-
- # Limit the copy to 100 Objects at the time, as it's the number of
- # Objects we display per page, default value for the list_per_page
- # of the ModelAdmin
- COPY_NB_OBJECT_LIMIT = 100
- if len(ids) > COPY_NB_OBJECT_LIMIT:
- msg = (
- f"Maximum of objects that can be copied at once is "
- f"limited to {COPY_NB_OBJECT_LIMIT} (by system-wide settings)"
- )
- messages.warning(request, msg)
- return redirect(redirect_url)
-
- # Let's find the Source Dataspace using the first id
- # This block will redirect the user to the list if the
- # first id of the list do not exist
- try:
- source_object = model_class.objects.get(id=ids[0])
- except ObjectDoesNotExist:
- return redirect(redirect_url)
-
- # No custom permission for 'copy', we use the 'add' one
- if not has_permission(source_object, request.user, "add"):
- messages.error(request, _("Sorry you do not have rights to execute this action"))
- return redirect(redirect_url)
-
- source = source_object.dataspace
- # As a non-Reference Dataspace User, I can only use the Reference
- # data as the source and my Dataspace as the target
- # As a Reference User, I can choose both, source and target.
- if user_dataspace.is_reference:
- # The following is only used when the User is in the Reference
- targets_from_request = request.GET.getlist("target")
- # If the target has been set, then we can continue
- if targets_from_request:
- data = {"target": targets_from_request}
- choice_form = MultiDataspaceChoiceForm(source, request.user, data=data)
- if not choice_form.is_valid():
- return redirect(redirect_url)
- targets = choice_form.cleaned_data["target"]
- # else, we build a form to offer the choice to the user,
- # choices do not include the current source
- else:
- initial = {
- "ids": requested_ids,
- "_changelist_filters": dict(parse_qsl(preserved_filters)).get(
- "_changelist_filters"
- ),
- }
- is_popup = request.GET.get(IS_POPUP_VAR, False)
- if is_popup:
- initial["_popup"] = is_popup
- choice_form = MultiDataspaceChoiceForm(source, request.user, initial=initial)
- return render(
- request,
- "admin/object_copy_dataspace_form.html",
- {
- "form": choice_form,
- "opts": opts,
- "is_popup": is_popup,
- "preserved_filters": preserved_filters,
- },
- )
- elif not source.is_reference:
- # As a non-Reference User my only "external" source of data allowed
- # is the Reference Dataspace
- return redirect(redirect_url)
- else:
- targets = [user_dataspace]
-
- # At this stage, we have the Source and Target Dataspaces
- # Let's see which objects are eligible for copy, or offer the update
- copy_candidates = []
- update_candidates = []
-
- # Building a QuerySet based on the given ids, if an non-authorized or
- # non-existing id was injected it will be ignored thanks to the
- # id__in and the dataspace scoping.
- queryset = model_class.objects.scope(source).filter(id__in=ids)
-
- for target in targets:
- for source_instance in queryset:
- matched_object = get_object_in(source_instance, target)
- if matched_object:
- # Inject the source_instance for future usage in the template
- update_candidates.append((matched_object, source_instance))
- else:
- copy_candidates.append((source_instance, target))
-
- initial = {
- "source": source,
- "targets": targets,
- "ct": ContentType.objects.get_for_model(model_class).id,
- }
- form = CopyConfigurationForm(request.user, initial=initial)
-
- # Many2Many exclude on copy/update
- m2m_initial = [
- {"ct": ContentType.objects.get_for_model(m2m_field.remote_field.through).id}
- for m2m_field in model_class._meta.many_to_many
- ]
-
- # Also handle relational fields if explicitly declared on the Model using the
- # get_extra_relational_fields method.
- for field_name in model_class.get_extra_relational_fields():
- related_model = model_class._meta.get_field(field_name).related_model
- if related_model().get_exclude_candidates_fields():
- ct = ContentType.objects.get_for_model(related_model)
- m2m_initial.append({"ct": ct.id})
-
- m2m_formset = M2MConfigurationFormSet(initial=m2m_initial)
-
- return render(
- request,
- "admin/object_copy.html",
- {
- "copy_candidates": copy_candidates,
- "update_candidates": update_candidates,
- "form": form,
- "m2m_formset": m2m_formset,
- "opts": source_object._meta,
- "preserved_filters": preserved_filters,
- },
- )
+ return object_copy_get(request, m2m_formset_class)
# Second section of the view, following the POST
if request.method == "POST":
+ model_class = get_model_class_from_path(request.path)
config_form = CopyConfigurationForm(request.user, request.POST)
if not config_form.is_valid():
@@ -1407,7 +1409,7 @@ def object_copy_view(request):
exclude_update = {model_class: config_form.cleaned_data.get("exclude_update")}
# Append the m2m copy configuration
- for m2m_form in M2MConfigurationFormSet(request.POST):
+ for m2m_form in m2m_formset_class(request.POST):
if not m2m_form.is_valid():
continue
m2m_model_class = m2m_form.model_class
diff --git a/dje/widgets.py b/dje/widgets.py
index 0f1ebd99..de74b1f5 100644
--- a/dje/widgets.py
+++ b/dje/widgets.py
@@ -46,7 +46,7 @@ def render(self, name, value, attrs=None, renderer=None, choices=()):
if value is None:
value = ""
final_attrs = self.build_attrs(self.attrs, extra_attrs=attrs)
- output = ["
" % flatatt(final_attrs)]
+ output = [f"
"]
options = self.render_options(choices, [value], name)
if options:
output.append(options)
@@ -119,7 +119,7 @@ class SortDropDownWidget(DropDownRightWidget):
"""
-class BootstrapSelectMixin(object):
+class BootstrapSelectMixin:
def __init__(self, attrs=None, choices=(), search=True, header_title="", search_placeholder=""):
self.search = search
self.header_title = header_title
diff --git a/license_library/tests/test_search_quality.py b/license_library/tests/test_search_quality.py
index 03d8751d..102e1112 100644
--- a/license_library/tests/test_search_quality.py
+++ b/license_library/tests/test_search_quality.py
@@ -60,8 +60,7 @@ def get_objects():
qs = qs.filter(name="nexB")
else:
qs = qs.scope_by_name("nexB")
- for obj in qs.iterator():
- yield obj
+ yield from qs.iterator()
serializer = NullPKSerializer()
data = serializer.serialize(
@@ -134,16 +133,15 @@ def do_search(self, query, expected_results, ordered=True, exact=False, field=No
self.assertEqual(
len(expected_results),
len(test_values),
- 'Unordered expected items "%r" differ in length with results items :"%r"'
- % (expected_results, test_values),
+ f'Unordered expected items "{expected_results}" differ in length with '
+ f'results items :"{test_values}"',
)
msgs = []
for i, er in enumerate(expected_results):
if er != "*":
if er not in test_values:
msgs.append(
- 'Unordered expected item "%s" missing in results: "%r"'
- % (er, test_values)
+ f'Unordered expected item "{er}" missing in results: "{test_values}"'
)
if msgs:
self.fail("\n".join(msgs))
@@ -199,7 +197,7 @@ def search_test_func(self):
# build a reasonably unique and valid function name
# based on the up to 50 chars from the query and a row number
test_func_name = "test_search_%04d_" % rownum + python_safe(
- query[:50] + "_%s" % notes if notes else ""
+ query[:50] + str(notes) if notes else ""
)
# these are needed to ensure we can use the tests name for selection in discovery
search_test_func.__name__ = str(test_func_name)
diff --git a/organization/tests/test_admin.py b/organization/tests/test_admin.py
index f2c629df..dde68237 100644
--- a/organization/tests/test_admin.py
+++ b/organization/tests/test_admin.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/organization/tests/test_models.py b/organization/tests/test_models.py
index 0459c5a0..6ac67623 100644
--- a/organization/tests/test_models.py
+++ b/organization/tests/test_models.py
@@ -136,10 +136,7 @@ def _create_simple_owner(self):
return owner
def _obj_copy_basic_for_owner_fails_if_target_is_not_an_dataspace(self, update=False):
- """
- Setup and test with an expected failure to test copy, update and
- noop.
- """
+ """Test with an expected failure to test copy, update and noop."""
owner = self._create_simple_owner()
expected_msg = '"Owner.dataspace" must be a "Dataspace" instance'
try:
@@ -188,7 +185,7 @@ def test_deepcopy_for_owner(self):
self._check_that_copy_succeeded(owner)
def _obj_copy_basic_owner_update(self, update=False):
- """Setup for a basic owner update operation"""
+ """Prepare for a basic owner update operation"""
owner = self._create_simple_owner()
# pre-create an owner with the same name in the target dataspace to
# trigger an update
diff --git a/organization/tests/test_views.py b/organization/tests/test_views.py
index 35b81603..d6fb6dd5 100644
--- a/organization/tests/test_views.py
+++ b/organization/tests/test_views.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# DejaCode is a trademark of nexB Inc.
diff --git a/product_portfolio/importers.py b/product_portfolio/importers.py
index df1e45ec..5f451b9c 100644
--- a/product_portfolio/importers.py
+++ b/product_portfolio/importers.py
@@ -259,7 +259,7 @@ def pre_process_form(self, data, **kwargs):
# Workaround default `dict` value on the model
additional_details = data.get(self.add_prefix("additional_details"))
- if not additional_details or additional_details == dict:
+ if not additional_details:
data[self.add_prefix("additional_details")] = "{}"
try:
diff --git a/product_portfolio/tests/test_attribution.py b/product_portfolio/tests/test_attribution.py
index bb600b95..0625dd57 100644
--- a/product_portfolio/tests/test_attribution.py
+++ b/product_portfolio/tests/test_attribution.py
@@ -308,9 +308,7 @@ def test_attribution_generation_configuration_licensing_in_product_section(self)
License1 AND License2
and the third-party licenses listed below.
- """.format(
- self.product1
- )
+ """.format(self.product1)
self.assertContains(response, expected, html=True)
self.assertContains(response, '