Skip to main content

Integrating Hawcx Backend SDK into Existing Python Projects

This guide walks through adding Hawcx OAuth authentication to an existing Python backend.

Step 1: Install the SDK

pip install hawcx-oauth-client

# Or with Flask support
pip install "hawcx-oauth-client[flask]"

Step 2: Set Up Environment Variables

Create a .env file or configure your environment with:
# OAuth Configuration (required)
OAUTH_TOKEN_ENDPOINT=https://oauth.hawcx.com/token
OAUTH_CLIENT_ID=your_client_id
OAUTH_PUBLIC_KEY=-----BEGIN PUBLIC KEY-----\n...

# Optional but recommended
OAUTH_ISSUER=https://oauth.hawcx.com
OAUTH_AUDIENCE=your_client_id

# For MFA features (if using delegation client)
SP_ED25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
SP_X25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
IDP_ED25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...
IDP_X25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...
Load environment variables in your application:
import os
from dotenv import load_dotenv

load_dotenv()

Step 3: Create an Authentication Endpoint

Flask

from flask import Flask, request, jsonify, session
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError, JWTVerificationError
import os
import jwt
from datetime import datetime, timedelta

app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')

@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
    try:
        data = request.get_json() or {}
        code = data.get('code')

        if not code:
            return jsonify({'error': 'Missing authorization code'}), 400

        # Exchange the authorization code for verified claims
        claims = exchange_code_for_claims(
            code=code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY'),
            audience=os.getenv('OAUTH_AUDIENCE') or os.getenv('OAUTH_CLIENT_ID'),
            issuer=os.getenv('OAUTH_ISSUER'),
            leeway=10
        )

        # Find or create user in your database
        user = find_or_create_user(
            hawcx_id=claims['sub'],
            email=claims['email'],
            email_verified=claims.get('email_verified', False)
        )

        # Create your application's session/JWT
        session_token = generate_session_token(user)

        return jsonify({
            'success': True,
            'sessionToken': session_token,
            'user': {
                'id': user['id'],
                'email': user['email']
            }
        })

    except OAuthExchangeError as e:
        app.logger.error(f'OAuth exchange failed: {e}')
        return jsonify({'error': 'Authentication failed'}), 401
    except JWTVerificationError as e:
        app.logger.error(f'JWT verification failed: {e}')
        return jsonify({'error': 'Authentication failed'}), 401
    except Exception as e:
        app.logger.error(f'Unexpected error: {e}')
        return jsonify({'error': 'Internal server error'}), 500

Django

# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import json
import os
import jwt
from datetime import datetime, timedelta

@csrf_exempt
@require_http_methods(["POST"])
def auth_callback(request):
    try:
        data = json.loads(request.body)
        code = data.get('code')

        if not code:
            return JsonResponse({'error': 'Missing authorization code'}, status=400)

        # Exchange the authorization code for verified claims
        claims = exchange_code_for_claims(
            code=code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY')
        )

        # Find or create user
        user = find_or_create_user(
            hawcx_id=claims['sub'],
            email=claims['email']
        )

        # Create session token
        session_token = generate_session_token(user)

        return JsonResponse({
            'success': True,
            'sessionToken': session_token,
            'user': {
                'id': user['id'],
                'email': user['email']
            }
        })

    except OAuthExchangeError as e:
        return JsonResponse({'error': 'Authentication failed'}, status=401)

FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import os
import jwt

app = FastAPI()

class AuthCallbackRequest(BaseModel):
    code: str

@app.post("/api/auth/hawcx/callback")
async def auth_callback(request: AuthCallbackRequest):
    try:
        if not request.code:
            raise HTTPException(status_code=400, detail="Missing authorization code")

        # Exchange the authorization code for verified claims
        claims = exchange_code_for_claims(
            code=request.code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY')
        )

        # Find or create user
        user = find_or_create_user(
            hawcx_id=claims['sub'],
            email=claims['email']
        )

        # Create session token
        session_token = generate_session_token(user)

        return {
            'success': True,
            'sessionToken': session_token,
            'user': {
                'id': user['id'],
                'email': user['email']
            }
        }

    except OAuthExchangeError:
        raise HTTPException(status_code=401, detail="Authentication failed")

Step 4: Integrate with Your User Management

Create a user service module:
# services/user_service.py
from datetime import datetime
import os
import jwt
from typing import Dict, Any, Optional

def find_or_create_user(hawcx_id: str, email: str, email_verified: bool = False) -> Dict[str, Any]:
    """Find existing user or create new one based on Hawcx claims"""
    # Import your database module
    from app.db import db

    # Check if user exists
    user = db.session.query(User).filter_by(hawcx_id=hawcx_id).first()

    if user:
        # Update email_verified if needed
        if email_verified and not user.email_verified:
            user.email_verified = True
            db.session.commit()
        return user.to_dict()

    # Create new user
    new_user = User(
        hawcx_id=hawcx_id,
        email=email,
        email_verified=email_verified,
        created_at=datetime.utcnow(),
        last_login=datetime.utcnow()
    )

    db.session.add(new_user)
    db.session.commit()

    return new_user.to_dict()

def generate_session_token(user: Dict[str, Any]) -> str:
    """Generate JWT session token for user"""
    return jwt.encode(
        {
            'userId': str(user['id']),
            'email': user['email'],
            'hawcxId': user['hawcx_id'],
            'exp': datetime.utcnow() + timedelta(days=7)
        },
        os.getenv('SESSION_SECRET'),
        algorithm='HS256'
    )

def verify_session_token(token: str) -> Optional[Dict[str, Any]]:
    """Verify and decode session token"""
    try:
        return jwt.decode(
            token,
            os.getenv('SESSION_SECRET'),
            algorithms=['HS256']
        )
    except jwt.InvalidTokenError:
        return None

Step 5: Add Authentication Middleware/Decorators

Flask

# middleware/auth.py
from functools import wraps
from flask import request, jsonify
from hawcx_oauth_client import verify_jwt
import os

def require_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization', '').replace('Bearer ', '')

        if not token:
            return jsonify({'error': 'Missing authorization token'}), 401

        try:
            payload = verify_jwt(
                token,
                os.getenv('SESSION_SECRET'),
                algorithms=['HS256']
            )
            request.user = payload
            return f(*args, **kwargs)
        except Exception as e:
            return jsonify({'error': 'Invalid or expired token'}), 401

    return decorated_function

# Usage
@app.route('/api/me', methods=['GET'])
@require_auth
def get_current_user():
    return jsonify(request.user)

FastAPI

# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthCredentials
import jwt
import os

security = HTTPBearer()

async def get_current_user(credentials: HTTPAuthCredentials = Depends(security)):
    try:
        payload = jwt.decode(
            credentials.credentials,
            os.getenv('SESSION_SECRET'),
            algorithms=['HS256']
        )
        return payload
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token"
        )

# Usage
@app.get("/api/me")
async def get_current_user_endpoint(user = Depends(get_current_user)):
    return user

Django

# decorators.py
from functools import wraps
from django.http import JsonResponse
import jwt
import os

def require_auth(view_func):
    @wraps(view_func)
    def wrapped_view(request, *args, **kwargs):
        auth_header = request.headers.get('Authorization', '')
        token = auth_header.replace('Bearer ', '') if auth_header else None

        if not token:
            return JsonResponse({'error': 'Missing authorization token'}, status=401)

        try:
            payload = jwt.decode(
                token,
                os.getenv('SESSION_SECRET'),
                algorithms=['HS256']
            )
            request.user = payload
            return view_func(request, *args, **kwargs)
        except jwt.InvalidTokenError:
            return JsonResponse({'error': 'Invalid or expired token'}, status=401)

    return wrapped_view

# Usage
@require_auth
def get_current_user(request):
    return JsonResponse(request.user)

Step 6: Set Up MFA (Optional)

Create an MFA service:
# services/mfa_service.py
from hawcx_oauth_client.delegation import HawcxDelegationClient, MfaMethod
import os

class MFAService:
    def __init__(self):
        self.client = HawcxDelegationClient.from_keys(
            sp_signing_key=os.getenv('SP_ED25519_PRIVATE_KEY_PEM'),
            sp_encryption_key=os.getenv('SP_X25519_PRIVATE_KEY_PEM'),
            idp_verify_key=os.getenv('IDP_ED25519_PUBLIC_KEY_PEM'),
            idp_encryption_key=os.getenv('IDP_X25519_PUBLIC_KEY_PEM'),
            sp_id=os.getenv('OAUTH_CLIENT_ID')
        )

    def initiate_mfa_setup(self, user_email: str, mfa_method: str, phone_number: str = None):
        """Initiate MFA setup for a user"""
        try:
            result = self.client.initiate_mfa_change(
                userid=user_email,
                mfa_method=MfaMethod(mfa_method),
                phone_number=phone_number
            )
            return result
        except Exception as e:
            raise Exception(f'Failed to initiate MFA: {str(e)}')

    def verify_mfa_code(self, user_email: str, session_id: str, otp: str):
        """Verify MFA code"""
        try:
            result = self.client.verify_mfa_change(
                userid=user_email,
                session_id=session_id,
                otp=otp
            )
            return result
        except Exception as e:
            raise Exception(f'Failed to verify MFA: {str(e)}')

    def get_user_mfa_status(self, user_email: str):
        """Get user's current MFA settings"""
        try:
            creds = self.client.get_user_credentials(user_email)
            return {
                'mfaEnabled': creds.get('mfa_method') != 'none',
                'mfaMethod': creds.get('mfa_method')
            }
        except Exception as e:
            raise Exception(f'Failed to fetch MFA status: {str(e)}')
Create MFA endpoints:
# Flask example
from flask import jsonify
from services.mfa_service import MFAService
from middleware.auth import require_auth

mfa_service = MFAService()

@app.route('/api/mfa/initiate', methods=['POST'])
@require_auth
def initiate_mfa():
    try:
        data = request.get_json()
        user_email = request.user.get('email')

        result = mfa_service.initiate_mfa_setup(
            user_email=user_email,
            mfa_method=data.get('mfaMethod'),
            phone_number=data.get('phoneNumber')
        )

        # Store session ID in cache with TTL
        cache.set(f'mfa_session:{user_email}', result['session_id'], 600)

        return jsonify({
            'success': True,
            'message': 'MFA setup initiated. Check your email/SMS for verification code.'
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/api/mfa/verify', methods=['POST'])
@require_auth
def verify_mfa():
    try:
        data = request.get_json()
        user_email = request.user.get('email')

        session_id = cache.get(f'mfa_session:{user_email}')
        if not session_id:
            return jsonify({'error': 'MFA session expired. Please try again.'}), 400

        result = mfa_service.verify_mfa_code(
            user_email=user_email,
            session_id=session_id,
            otp=data.get('otp')
        )

        if result['status'] != 'success':
            return jsonify({'error': 'Invalid verification code'}), 400

        cache.delete(f'mfa_session:{user_email}')

        return jsonify({
            'success': True,
            'message': 'MFA setup completed successfully'
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/api/mfa/status', methods=['GET'])
@require_auth
def get_mfa_status():
    try:
        user_email = request.user.get('email')
        status = mfa_service.get_user_mfa_status(user_email)
        return jsonify(status)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

Step 7: Error Handling

Create a centralized error handler:
# utils/errors.py
from hawcx_oauth_client import (
    OAuthExchangeError,
    JWTVerificationError,
    InvalidPublicKeyError
)

def handle_hawcx_error(error: Exception):
    """Convert Hawcx errors to user-friendly messages"""
    if isinstance(error, OAuthExchangeError):
        return {
            'status': 401,
            'message': 'The authorization code is invalid or expired'
        }

    if isinstance(error, JWTVerificationError):
        return {
            'status': 401,
            'message': 'The authentication token could not be verified'
        }

    if isinstance(error, InvalidPublicKeyError):
        return {
            'status': 500,
            'message': 'Server configuration error'
        }

    return {
        'status': 500,
        'message': 'An unexpected error occurred'
    }

Complete Example

Here’s a minimal Flask example:
# app.py
from flask import Flask, request, jsonify
from hawcx_oauth_client import exchange_code_for_claims
import os
import jwt
from datetime import datetime, timedelta
from dotenv import load_dotenv

load_dotenv()

app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')

@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
    try:
        data = request.get_json()
        code = data.get('code')

        claims = exchange_code_for_claims(
            code=code,
            oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
            client_id=os.getenv('OAUTH_CLIENT_ID'),
            public_key=os.getenv('OAUTH_PUBLIC_KEY')
        )

        session_token = jwt.encode(
            {
                'userId': claims['sub'],
                'email': claims['email'],
                'exp': datetime.utcnow() + timedelta(days=7)
            },
            os.getenv('SESSION_SECRET'),
            algorithm='HS256'
        )

        return jsonify({'sessionToken': session_token})

    except Exception as e:
        return jsonify({'error': 'Authentication failed'}), 401

@app.route('/api/me', methods=['GET'])
def get_me():
    token = request.headers.get('Authorization', '').replace('Bearer ', '')
    try:
        payload = jwt.decode(token, os.getenv('SESSION_SECRET'), algorithms=['HS256'])
        return jsonify(payload)
    except:
        return jsonify({'error': 'Unauthorized'}), 401

if __name__ == '__main__':
    app.run(debug=False, port=5000)

Next Steps