API Documentation
/
Backend
/
Python
/
Integrate in Existing Python Project

Integrate in Existing Python Project

Add Hawcx authentication to an existing Python application

Integrating Hawcx Backend SDK into Existing Python Projects

This guide walks through adding Hawcx OAuth authentication to an existing Python backend.

Step 1: Install the SDK

pip install hawcx-oauth-client

# Or with Flask support
pip install "hawcx-oauth-client[flask]"

Step 2: Set Up Environment Variables

Create a .env file or configure your environment with:

# OAuth Configuration (required)
OAUTH_TOKEN_ENDPOINT=https://example.hawcx.com/oauth2/token
OAUTH_CLIENT_ID=your_client_id
JWKS_URL="https://example.hawcx.com/.well-known/jwks.json"
# Optional but recommended
OAUTH_ISSUER=https://example.hawcx.com
OAUTH_AUDIENCE=your_client_id

# For MFA features (if using delegation client)
SP_ED25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
SP_X25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
IDP_ED25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...
IDP_X25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...

Load environment variables in your application:

import os
from dotenv import load_dotenv

load_dotenv()

Step 3: Create an Authentication Endpoint

Flask

from flask import Flask, request, jsonify, session
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError, JWTVerificationError
import os
import jwt
from datetime import datetime, timedelta

app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')

@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
    try:
        data = request.get_json() or {}
        code = data.get('code')

        if not code:
            return jsonify({'error': 'Missing authorization code'}), 400

        # Exchange the authorization code for verified claims
        claims = exchange_code_for_claims(
            code=code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY'),
            audience=os.getenv('OAUTH_AUDIENCE') or os.getenv('OAUTH_CLIENT_ID'),
            issuer=os.getenv('OAUTH_ISSUER'),
            leeway=10
        )

        # Find or create user in your database
        user = find_or_create_user(
            hawcx_id=claims['sub'],
            email=claims['email'],
            email_verified=claims.get('email_verified', False)
        )

        # Create your application's session/JWT
        session_token = generate_session_token(user)

        return jsonify({
            'success': True,
            'sessionToken': session_token,
            'user': {
                'id': user['id'],
                'email': user['email']
            }
        })

    except OAuthExchangeError as e:
        app.logger.error(f'OAuth exchange failed: {e}')
        return jsonify({'error': 'Authentication failed'}), 401
    except JWTVerificationError as e:
        app.logger.error(f'JWT verification failed: {e}')
        return jsonify({'error': 'Authentication failed'}), 401
    except Exception as e:
        app.logger.error(f'Unexpected error: {e}')
        return jsonify({'error': 'Internal server error'}), 500

Django

# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import json
import os
import jwt
from datetime import datetime, timedelta

@csrf_exempt
@require_http_methods(["POST"])
def auth_callback(request):
    try:
        data = json.loads(request.body)
        code = data.get('code')

        if not code:
            return JsonResponse({'error': 'Missing authorization code'}, status=400)

        # Exchange the authorization code for verified claims
        claims = exchange_code_for_claims(
            code=code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY')
        )

        # Find or create user
        user = find_or_create_user(
            hawcx_id=claims['sub'],
            email=claims['email']
        )

        # Create session token
        session_token = generate_session_token(user)

        return JsonResponse({
            'success': True,
            'sessionToken': session_token,
            'user': {
                'id': user['id'],
                'email': user['email']
            }
        })

    except OAuthExchangeError as e:
        return JsonResponse({'error': 'Authentication failed'}, status=401)

FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import os
import jwt

app = FastAPI()

class AuthCallbackRequest(BaseModel):
    code: str

@app.post("/api/auth/hawcx/callback")
async def auth_callback(request: AuthCallbackRequest):
    try:
        if not request.code:
            raise HTTPException(status_code=400, detail="Missing authorization code")

        # Exchange the authorization code for verified claims
        claims = exchange_code_for_claims(
            code=request.code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY')
        )

        # Find or create user
        user = find_or_create_user(
            hawcx_id=claims['sub'],
            email=claims['email']
        )

        # Create session token
        session_token = generate_session_token(user)

        return {
            'success': True,
            'sessionToken': session_token,
            'user': {
                'id': user['id'],
                'email': user['email']
            }
        }

    except OAuthExchangeError:
        raise HTTPException(status_code=401, detail="Authentication failed")

Step 4: Integrate with Your User Management

Create a user service module:

# services/user_service.py
from datetime import datetime
import os
import jwt
from typing import Dict, Any, Optional

def find_or_create_user(hawcx_id: str, email: str, email_verified: bool = False) -> Dict[str, Any]:
    """Find existing user or create new one based on Hawcx claims"""
    # Import your database module
    from app.db import db

    # Check if user exists
    user = db.session.query(User).filter_by(hawcx_id=hawcx_id).first()

    if user:
        # Update email_verified if needed
        if email_verified and not user.email_verified:
            user.email_verified = True
            db.session.commit()
        return user.to_dict()

    # Create new user
    new_user = User(
        hawcx_id=hawcx_id,
        email=email,
        email_verified=email_verified,
        created_at=datetime.utcnow(),
        last_login=datetime.utcnow()
    )

    db.session.add(new_user)
    db.session.commit()

    return new_user.to_dict()

def generate_session_token(user: Dict[str, Any]) -> str:
    """Generate JWT session token for user"""
    return jwt.encode(
        {
            'userId': str(user['id']),
            'email': user['email'],
            'hawcxId': user['hawcx_id'],
            'exp': datetime.utcnow() + timedelta(days=7)
        },
        os.getenv('SESSION_SECRET'),
        algorithm='HS256'
    )

def verify_session_token(token: str) -> Optional[Dict[str, Any]]:
    """Verify and decode session token"""
    try:
        return jwt.decode(
            token,
            os.getenv('SESSION_SECRET'),
            algorithms=['HS256']
        )
    except jwt.InvalidTokenError:
        return None

Step 5: Add Authentication Middleware/Decorators

Flask

# middleware/auth.py
from functools import wraps
from flask import request, jsonify
from hawcx_oauth_client import verify_jwt
import os

def require_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization', '').replace('Bearer ', '')

        if not token:
            return jsonify({'error': 'Missing authorization token'}), 401

        try:
            payload = verify_jwt(
                token,
                os.getenv('SESSION_SECRET'),
                algorithms=['HS256']
            )
            request.user = payload
            return f(*args, **kwargs)
        except Exception as e:
            return jsonify({'error': 'Invalid or expired token'}), 401

    return decorated_function

# Usage
@app.route('/api/me', methods=['GET'])
@require_auth
def get_current_user():
    return jsonify(request.user)

FastAPI

# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthCredentials
import jwt
import os

security = HTTPBearer()

async def get_current_user(credentials: HTTPAuthCredentials = Depends(security)):
    try:
        payload = jwt.decode(
            credentials.credentials,
            os.getenv('SESSION_SECRET'),
            algorithms=['HS256']
        )
        return payload
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token"
        )

# Usage
@app.get("/api/me")
async def get_current_user_endpoint(user = Depends(get_current_user)):
    return user

Django

# decorators.py
from functools import wraps
from django.http import JsonResponse
import jwt
import os

def require_auth(view_func):
    @wraps(view_func)
    def wrapped_view(request, *args, **kwargs):
        auth_header = request.headers.get('Authorization', '')
        token = auth_header.replace('Bearer ', '') if auth_header else None

        if not token:
            return JsonResponse({'error': 'Missing authorization token'}, status=401)

        try:
            payload = jwt.decode(
                token,
                os.getenv('SESSION_SECRET'),
                algorithms=['HS256']
            )
            request.user = payload
            return view_func(request, *args, **kwargs)
        except jwt.InvalidTokenError:
            return JsonResponse({'error': 'Invalid or expired token'}, status=401)

    return wrapped_view

# Usage
@require_auth
def get_current_user(request):
    return JsonResponse(request.user)

Step 6: Set Up MFA (Optional)

Create an MFA service:

# services/mfa_service.py
from hawcx_oauth_client.delegation import HawcxDelegationClient, MfaMethod
import os

class MFAService:
    def __init__(self):
        self.client = HawcxDelegationClient.from_keys(
            sp_signing_key=os.getenv('SP_ED25519_PRIVATE_KEY_PEM'),
            sp_encryption_key=os.getenv('SP_X25519_PRIVATE_KEY_PEM'),
            idp_verify_key=os.getenv('IDP_ED25519_PUBLIC_KEY_PEM'),
            idp_encryption_key=os.getenv('IDP_X25519_PUBLIC_KEY_PEM'),
            sp_id=os.getenv('OAUTH_CLIENT_ID')
        )

    def initiate_mfa_setup(self, user_email: str, mfa_method: str, phone_number: str = None):
        """Initiate MFA setup for a user"""
        try:
            result = self.client.initiate_mfa_change(
                userid=user_email,
                mfa_method=MfaMethod(mfa_method),
                phone_number=phone_number
            )
            return result
        except Exception as e:
            raise Exception(f'Failed to initiate MFA: {str(e)}')

    def verify_mfa_code(self, user_email: str, session_id: str, otp: str):
        """Verify MFA code"""
        try:
            result = self.client.verify_mfa_change(
                userid=user_email,
                session_id=session_id,
                otp=otp
            )
            return result
        except Exception as e:
            raise Exception(f'Failed to verify MFA: {str(e)}')

    def get_user_mfa_status(self, user_email: str):
        """Get user's current MFA settings"""
        try:
            creds = self.client.get_user_credentials(user_email)
            return {
                'mfaEnabled': creds.get('mfa_method') != 'none',
                'mfaMethod': creds.get('mfa_method')
            }
        except Exception as e:
            raise Exception(f'Failed to fetch MFA status: {str(e)}')

Create MFA endpoints:

# Flask example
from flask import jsonify
from services.mfa_service import MFAService
from middleware.auth import require_auth

mfa_service = MFAService()

@app.route('/api/mfa/initiate', methods=['POST'])
@require_auth
def initiate_mfa():
    try:
        data = request.get_json()
        user_email = request.user.get('email')

        result = mfa_service.initiate_mfa_setup(
            user_email=user_email,
            mfa_method=data.get('mfaMethod'),
            phone_number=data.get('phoneNumber')
        )

        # Store session ID in cache with TTL
        cache.set(f'mfa_session:{user_email}', result['session_id'], 600)

        return jsonify({
            'success': True,
            'message': 'MFA setup initiated. Check your email/SMS for verification code.'
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/api/mfa/verify', methods=['POST'])
@require_auth
def verify_mfa():
    try:
        data = request.get_json()
        user_email = request.user.get('email')

        session_id = cache.get(f'mfa_session:{user_email}')
        if not session_id:
            return jsonify({'error': 'MFA session expired. Please try again.'}), 400

        result = mfa_service.verify_mfa_code(
            user_email=user_email,
            session_id=session_id,
            otp=data.get('otp')
        )

        if result['status'] != 'success':
            return jsonify({'error': 'Invalid verification code'}), 400

        cache.delete(f'mfa_session:{user_email}')

        return jsonify({
            'success': True,
            'message': 'MFA setup completed successfully'
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/api/mfa/status', methods=['GET'])
@require_auth
def get_mfa_status():
    try:
        user_email = request.user.get('email')
        status = mfa_service.get_user_mfa_status(user_email)
        return jsonify(status)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

Step 7: Error Handling

Create a centralized error handler:

# utils/errors.py
from hawcx_oauth_client import (
    OAuthExchangeError,
    JWTVerificationError,
    InvalidPublicKeyError
)

def handle_hawcx_error(error: Exception):
    """Convert Hawcx errors to user-friendly messages"""
    if isinstance(error, OAuthExchangeError):
        return {
            'status': 401,
            'message': 'The authorization code is invalid or expired'
        }

    if isinstance(error, JWTVerificationError):
        return {
            'status': 401,
            'message': 'The authentication token could not be verified'
        }

    if isinstance(error, InvalidPublicKeyError):
        return {
            'status': 500,
            'message': 'Server configuration error'
        }

    return {
        'status': 500,
        'message': 'An unexpected error occurred'
    }

Complete Example

Here's a minimal Flask example:

# app.py
from flask import Flask, request, jsonify
from hawcx_oauth_client import exchange_code_for_claims
import os
import jwt
from datetime import datetime, timedelta
from dotenv import load_dotenv

load_dotenv()

app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')

@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
    try:
        data = request.get_json()
        code = data.get('code')

        claims = exchange_code_for_claims(
            code=code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY')
        )

        session_token = jwt.encode(
            {
                'userId': claims['sub'],
                'email': claims['email'],
                'exp': datetime.utcnow() + timedelta(days=7)
            },
            os.getenv('SESSION_SECRET'),
            algorithm='HS256'
        )

        return jsonify({'sessionToken': session_token})

    except Exception as e:
        return jsonify({'error': 'Authentication failed'}), 401

@app.route('/api/me', methods=['GET'])
def get_me():
    token = request.headers.get('Authorization', '').replace('Bearer ', '')
    try:
        payload = jwt.decode(token, os.getenv('SESSION_SECRET'), algorithms=['HS256'])
        return jsonify(payload)
    except:
        return jsonify({'error': 'Unauthorized'}), 401

if __name__ == '__main__':
    app.run(debug=False, port=5000)

Next Steps