[Answered ]-Integrating fusionauth and django-rest-framework

1πŸ‘

βœ…

The System Design you have described is a very good practise by implementing the OAuth 2.0 protocol, which involves your Django application handling its own sessions independently of the FusionAuth server.

But I get if you want to use the FusionAuth's token directly for the authentication.

To accomplish this, you could develop a custom authentication class in DRF that would verify the FusionAuth token. However, it would be a good idea to avoid excessive requests to the FusionAuth server for every incoming request in order to prevent an overloading and a slowing down your Django application.

So here is what you could do:

By using the PyJWT Library (check out the: PyJWT Website) this issue can be solved.
You can use pip for that or any other software like pip.

Command:

pip install PyJWT

Now create a custom authentication class. The following code shows how to decode and verify/validate the JWT (JSON Web Token) and create or get the user in Django.

Code:

# Import modules
import jwt
from django.contrib.auth import get_user_model
from django.conf import settings
from rest_framework import authentication, exceptions

# Get user model
User = get_user_model()

# Define the custom authentication class
class FusionAuthJWTAuthentication(authentication.BaseAuthentication):
    def authenticate(self, request):

        # Get the authorization header from the request
        auth_data = authentication.get_authorization_header(request)

        # If the header is missing, return None to signify unauthenticated
        if not auth_data:
            return None

        # Split header into, the prefix and the token
        prefix, token = auth_data.decode("utf-8").split(" ")

        try:
            """
            Decode the JWT and validate it.
            Note: Replace 'your-secret-key' with your actual FusionAuth JWT secret.
            Also, update the algorithm based on your FusionAuth configuration.
            """
            payload = jwt.decode(token, "your-secret-key", algorithms=["HS256"])

            # Get or create a user based on the subject of the payload
            user, created = User.objects.get_or_create(
                fa_id=payload["sub"],
                defaults={
                    "username": payload["username"],
                    "email": payload["email"],
                },
            )

            # Returns the user and token for successful authentication
            return (user, token)

        except jwt.DecodeError:
            # Trigger an exception for an invalid tokens
            raise exceptions.AuthenticationFailed("Your token is invalid, login again.")

        except jwt.ExpiredSignatureError:
            # Trigger an exception for an expired tokens
            raise exceptions.AuthenticationFailed(
                "Your token has expired, login again."
            )

Now the last step is to add the authentication class to your DRF settings.

Code:

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'path.to.your.FusionAuthJWTAuthentication',
        # Other authentication classes if there are any
    ),
    # Other DRF settings
}

From now on your Django application will authenticate users with the access_token from FusionAuth.So, Every time you make a request with the Authorization: Bearer header, DRF will use FusionAuthJWTAuthentication to authenticate the user.

Mind to replace your-secret-key in the jwt.decode Call with your FusionAuth JWT (JSON Web Token) secret. Also, if needed update the algorithm for JWT decoding. HS256 is a good example.

Important: This setup only verifys when the token is valid and not expired. It will not check if the token has been revoked on FusionAuth side. Consider this trade-off.

πŸ‘€AztecCodes

0πŸ‘

I wound up writing a custom DRF Authentication Class which checks the token against FusionAuth:

from django.conf import settings
from django.contrib.auth import get_user_model
from rest_framework.authentication import (
    HTTP_HEADER_ENCODING,
    BaseAuthentication,
    get_authorization_header,
)
from rest_framework.exceptions import AuthenticationFailed
from fusionauth.fusionauth_client import FusionAuthClient

AUTH_CLIENT = FusionAuthClient(
    settings.FUSIONAUTH_API_KEY, settings.FUSIONAUTH_URL
)

UserModel = get_user_model()


class OAuth2Authentication(BaseAuthentication):
    """
    Class for performing DRF Authentication using OAuth2 via FusionAuth
    """
    def authenticate(self, request):
        """
        Authenticate the request and return a tuple of (user, token) or None
        if there was no authentication attempt.
        """
        access_token = self.get_access_token(request)
        if not access_token:
            return None

        auth_jwt_response = AUTH_CLIENT.validate_jwt(access_token)
        if not auth_jwt_response.was_successful():
            raise AuthenticationFailed(auth_jwt_response.error_response)
        auth_jwt = auth_jwt_response.success_response["jwt"]
        auth_id = auth_jwt["sub"]

        try:
            user = UserModel.objects.active().get(auth_id=auth_id)
        except UserModel.DoesNotExist as e:
            msg = "User does not exist"
            raise AuthenticationFailed(msg) from e

        return user, access_token

    def get_access_token(self, request):
        """
        Get the access token based on a request.

        Returns None if no authentication details were provided. Raises
        AuthenticationFailed if the token is incorrect.
        """
        header = get_authorization_header(request)
        if not header:
            return None
        header = header.decode(HTTP_HEADER_ENCODING)

        auth = header.split()

        if auth[0].lower() != "bearer":
            return None

        if len(auth) == 1:
            msg = "Invalid 'bearer' header: No credentials provided."
            raise AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = "Invalid 'bearer' header: Credentials string should not contain spaces."
            raise AuthenticationFailed(msg)

        return auth[1]
πŸ‘€trubliphone

Leave a comment