diff --git a/netbox/dcim/filtersets.py b/netbox/dcim/filtersets.py index 776021af1e1..68edc93f6fe 100644 --- a/netbox/dcim/filtersets.py +++ b/netbox/dcim/filtersets.py @@ -1,7 +1,9 @@ import django_filters from django.contrib.auth import get_user_model +from django.contrib.contenttypes.models import ContentType from django.utils.translation import gettext as _ +from circuits.models import CircuitTermination from extras.filtersets import LocalConfigContextFilterSet from extras.models import ConfigTemplate from ipam.filtersets import PrimaryIPFilterSet @@ -1804,6 +1806,35 @@ class CableFilterSet(TenancyFilterSet, NetBoxModelFilterSet): field_name='site__slug' ) + # Termination object filters + consoleport_id = MultiValueNumberFilter( + method='filter_by_consoleport' + ) + consoleserverport_id = MultiValueNumberFilter( + method='filter_by_consoleserverport' + ) + powerport_id = MultiValueNumberFilter( + method='filter_by_powerport' + ) + poweroutlet_id = MultiValueNumberFilter( + method='filter_by_poweroutlet' + ) + interface_id = MultiValueNumberFilter( + method='filter_by_interface' + ) + frontport_id = MultiValueNumberFilter( + method='filter_by_frontport' + ) + rearport_id = MultiValueNumberFilter( + method='filter_by_rearport' + ) + powerfeed_id = MultiValueNumberFilter( + method='filter_by_powerfeed' + ) + circuittermination_id = MultiValueNumberFilter( + method='filter_by_circuittermination' + ) + class Meta: model = Cable fields = ['id', 'label', 'length', 'length_unit', 'description'] @@ -1847,6 +1878,42 @@ def _unterminated(self, queryset, name, value): terminations__cable_end=CableEndChoices.SIDE_B ) + def filter_by_termination_object(self, queryset, model, value): + # Filter by specific termination object(s) + content_type = ContentType.objects.get_for_model(model) + cable_ids = CableTermination.objects.filter( + termination_type=content_type, + termination_id__in=value + ).values_list('cable', flat=True) + return queryset.filter(pk__in=cable_ids) + + def filter_by_consoleport(self, queryset, name, value): + return self.filter_by_termination_object(queryset, ConsolePort, value) + + def filter_by_consoleserverport(self, queryset, name, value): + return self.filter_by_termination_object(queryset, ConsoleServerPort, value) + + def filter_by_powerport(self, queryset, name, value): + return self.filter_by_termination_object(queryset, PowerPort, value) + + def filter_by_poweroutlet(self, queryset, name, value): + return self.filter_by_termination_object(queryset, PowerOutlet, value) + + def filter_by_interface(self, queryset, name, value): + return self.filter_by_termination_object(queryset, Interface, value) + + def filter_by_frontport(self, queryset, name, value): + return self.filter_by_termination_object(queryset, FrontPort, value) + + def filter_by_rearport(self, queryset, name, value): + return self.filter_by_termination_object(queryset, RearPort, value) + + def filter_by_powerfeed(self, queryset, name, value): + return self.filter_by_termination_object(queryset, PowerFeed, value) + + def filter_by_circuittermination(self, queryset, name, value): + return self.filter_by_termination_object(queryset, CircuitTermination, value) + class CableTerminationFilterSet(BaseFilterSet): termination_type = ContentTypeFilter() diff --git a/netbox/dcim/tests/test_filtersets.py b/netbox/dcim/tests/test_filtersets.py index d941b16584b..89d15a0ef14 100644 --- a/netbox/dcim/tests/test_filtersets.py +++ b/netbox/dcim/tests/test_filtersets.py @@ -1,6 +1,7 @@ from django.contrib.auth import get_user_model from django.test import TestCase +from circuits.models import Circuit, CircuitTermination, CircuitType, Provider from dcim.choices import * from dcim.filtersets import * from dcim.models import * @@ -4714,6 +4715,23 @@ def setUpTestData(cls): console_port = ConsolePort.objects.create(device=devices[0], name='Console Port 1') console_server_port = ConsoleServerPort.objects.create(device=devices[0], name='Console Server Port 1') + power_port = PowerPort.objects.create(device=devices[0], name='Power Port 1') + power_outlet = PowerOutlet.objects.create(device=devices[0], name='Power Outlet 1') + rear_port = RearPort.objects.create(device=devices[0], name='Rear Port 1', positions=1) + front_port = FrontPort.objects.create( + device=devices[0], + name='Front Port 1', + rear_port=rear_port, + rear_port_position=1 + ) + + power_panel = PowerPanel.objects.create(name='Power Panel 1', site=sites[0]) + power_feed = PowerFeed.objects.create(name='Power Feed 1', power_panel=power_panel) + + provider = Provider.objects.create(name='Provider 1', slug='provider-1') + circuit_type = CircuitType.objects.create(name='Circuit Type 1', slug='circuit-type-1') + circuit = Circuit.objects.create(cid='Circuit 1', provider=provider, type=circuit_type) + circuit_termination = CircuitTermination.objects.create(circuit=circuit, term_side='A', site=sites[0]) # Cables cables = ( @@ -4786,18 +4804,39 @@ def setUpTestData(cls): length=20, length_unit=CableLengthUnitChoices.UNIT_METER ), + + # Cables for filtering by termination object Cable( a_terminations=[console_port], - b_terminations=[console_server_port], label='Cable 7' ), - - # Cable for unterminated test Cable( - a_terminations=[interfaces[12]], - label='Cable 8', - type=CableTypeChoices.TYPE_CAT6, - status=LinkStatusChoices.STATUS_DECOMMISSIONING + a_terminations=[console_server_port], + label='Cable 8' + ), + Cable( + a_terminations=[power_port], + label='Cable 9' + ), + Cable( + a_terminations=[power_outlet], + label='Cable 10' + ), + Cable( + a_terminations=[front_port], + label='Cable 11' + ), + Cable( + a_terminations=[rear_port], + label='Cable 12' + ), + Cable( + a_terminations=[power_feed], + label='Cable 13' + ), + Cable( + a_terminations=[circuit_termination], + label='Cable 14' ), ) for cable in cables: @@ -4825,7 +4864,7 @@ def test_type(self): def test_status(self): params = {'status': [LinkStatusChoices.STATUS_CONNECTED]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 11) params = {'status': [LinkStatusChoices.STATUS_PLANNED]} self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3) @@ -4840,30 +4879,30 @@ def test_description(self): def test_device(self): devices = Device.objects.all()[:2] params = {'device_id': [devices[0].pk, devices[1].pk]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 9) params = {'device': [devices[0].name, devices[1].name]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 9) def test_rack(self): racks = Rack.objects.all()[:2] params = {'rack_id': [racks[0].pk, racks[1].pk]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 11) params = {'rack': [racks[0].name, racks[1].name]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 11) def test_location(self): locations = Location.objects.all()[:2] params = {'location_id': [locations[0].pk, locations[1].pk]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 11) params = {'location': [locations[0].name, locations[1].name]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 11) def test_site(self): site = Site.objects.all()[:2] params = {'site_id': [site[0].pk, site[1].pk]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 12) params = {'site': [site[0].slug, site[1].slug]} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 12) def test_tenant(self): tenant = Tenant.objects.all()[:2] @@ -4875,8 +4914,8 @@ def test_tenant(self): def test_termination_types(self): params = {'termination_a_type': 'dcim.consoleport'} self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) - params = {'termination_b_type': 'dcim.consoleserverport'} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + # params = {'termination_b_type': 'dcim.consoleserverport'} + # self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) def test_termination_ids(self): interface_ids = CableTermination.objects.filter( @@ -4891,9 +4930,41 @@ def test_termination_ids(self): def test_unterminated(self): params = {'unterminated': True} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8) params = {'unterminated': False} - self.assertEqual(self.filterset(params, self.queryset).qs.count(), 7) + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6) + + def test_consoleport(self): + params = {'consoleport_id': [ConsolePort.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_consoleserverport(self): + params = {'consoleserverport_id': [ConsoleServerPort.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_powerport(self): + params = {'powerport_id': [PowerPort.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_poweroutlet(self): + params = {'poweroutlet_id': [PowerOutlet.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_frontport(self): + params = {'frontport_id': [FrontPort.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_rearport(self): + params = {'rearport_id': [RearPort.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_powerfeed(self): + params = {'powerfeed_id': [PowerFeed.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + + def test_circuittermination(self): + params = {'circuittermination_id': [CircuitTermination.objects.first().pk]} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) class PowerPanelTestCase(TestCase, ChangeLoggedFilterSetTests):