Coverage for django_napse/api/keys/views/key_view.py: 32%
78 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
1from django.db.transaction import atomic
2from rest_framework import status
3from rest_framework.decorators import action
4from rest_framework.response import Response
6from django_napse.api.custom_permissions import HasAPIKey, HasMasterKey
7from django_napse.api.custom_viewset import CustomViewSet
8from django_napse.api.keys.serializers import NapseAPIKeySerializer
9from django_napse.api.keys.serializers.key import NapseAPIKeySpaceSerializer
10from django_napse.auth.models import NapseAPIKey
11from django_napse.utils.constants import PERMISSION_TYPES
14class KeyView(CustomViewSet):
15 permission_classes = [HasMasterKey]
16 serializer_class = NapseAPIKeySerializer
18 def get_queryset(self):
19 return NapseAPIKey.objects.all()
21 def get_object(self):
22 return NapseAPIKey.objects.get(prefix=self.kwargs["pk"])
24 def get_permissions(self):
25 match self.action:
26 case "connect" | "possible_permissions":
27 return [HasAPIKey()]
28 case "retrieve":
29 return [HasAPIKey()]
30 case _:
31 return super().get_permissions()
33 def create(self, request):
34 if "name" not in request.data:
35 return Response({"error": "Missing name"}, status=status.HTTP_400_BAD_REQUEST)
37 _, key = NapseAPIKey.objects.create_key(name=request.data["name"], description=request.data.get("description", ""))
39 return Response({"key": key}, status=status.HTTP_201_CREATED)
41 def retrieve(self, request, pk):
42 key = self.get_object()
43 if "space" not in request.query_params:
44 serializer = NapseAPIKeySerializer(key)
45 return Response(serializer.data, status=status.HTTP_200_OK)
46 serializer = NapseAPIKeySpaceSerializer(
47 key,
48 context={
49 "space": request.query_params["space"],
50 },
51 )
52 return Response(serializer.data, status=status.HTTP_200_OK)
54 def list(self, request):
55 keys = self.get_queryset()
56 if "space" not in request.query_params:
57 serializer = NapseAPIKeySerializer(keys, many=True)
58 return Response(serializer.data, status=status.HTTP_200_OK)
60 serializer = NapseAPIKeySpaceSerializer(
61 keys,
62 many=True,
63 context={
64 "space": request.query_params["space"],
65 },
66 )
67 data = {"keys": []}
68 for key in serializer.data:
69 if key["permissions"]:
70 data["keys"].append(key)
71 master_key = NapseAPIKey.objects.get(is_master_key=True)
72 serializer = NapseAPIKeySerializer(master_key)
73 data["master_key"] = serializer.data
74 return Response(data, status=status.HTTP_200_OK)
76 def destroy(self, request, pk):
77 key = self.get_object()
78 key.delete()
79 return Response(status=status.HTTP_204_NO_CONTENT)
81 def patch(self, request, pk):
82 key = self.get_object()
83 with atomic():
84 if "name" in request.data:
85 key.name = request.data["name"]
86 if "description" in request.data:
87 key.description = request.data["description"]
88 if "permissions" in request.data:
89 for permission in key.permissions.filter(space=self.get_space(request)):
90 permission.delete()
91 for permission in request.data["permissions"]:
92 key.add_permission(self.get_space(request), permission)
93 key.save()
94 if request.data.get("revoked", False) and not key.is_master_key:
95 key.revoke()
96 serializer = NapseAPIKeySerializer(key)
97 return Response(serializer.data, status=status.HTTP_200_OK)
99 @action(detail=False, methods=["GET"])
100 def possible_permissions(self, request):
101 return Response(list(PERMISSION_TYPES), status=status.HTTP_200_OK)
103 @action(detail=False, methods=["GET"])
104 def connect(self, request):
105 return Response(status=status.HTTP_204_NO_CONTENT)