Integrate in Existing Python Project
Add Hawcx authentication to an existing Python application
Integrating Hawcx Backend SDK into Existing Python Projects
This guide walks through adding Hawcx OAuth authentication to an existing Python backend.
Step 1: Install the SDK
pip install hawcx-oauth-client
# Or with Flask support
pip install "hawcx-oauth-client[flask]"Step 2: Set Up Environment Variables
Create a .env file or configure your environment with:
# OAuth Configuration (required)
OAUTH_TOKEN_ENDPOINT=https://example.hawcx.com/oauth2/token
OAUTH_CLIENT_ID=your_client_id
JWKS_URL="https://example.hawcx.com/.well-known/jwks.json"
# Optional but recommended
OAUTH_ISSUER=https://example.hawcx.com
OAUTH_AUDIENCE=your_client_id
# For MFA features (if using delegation client)
SP_ED25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
SP_X25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
IDP_ED25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...
IDP_X25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...Load environment variables in your application:
import os
from dotenv import load_dotenv
load_dotenv()Step 3: Create an Authentication Endpoint
Flask
from flask import Flask, request, jsonify, session
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError, JWTVerificationError
import os
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')
@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
try:
data = request.get_json() or {}
code = data.get('code')
if not code:
return jsonify({'error': 'Missing authorization code'}), 400
# Exchange the authorization code for verified claims
claims = exchange_code_for_claims(
code=code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY'),
audience=os.getenv('OAUTH_AUDIENCE') or os.getenv('OAUTH_CLIENT_ID'),
issuer=os.getenv('OAUTH_ISSUER'),
leeway=10
)
# Find or create user in your database
user = find_or_create_user(
hawcx_id=claims['sub'],
email=claims['email'],
email_verified=claims.get('email_verified', False)
)
# Create your application's session/JWT
session_token = generate_session_token(user)
return jsonify({
'success': True,
'sessionToken': session_token,
'user': {
'id': user['id'],
'email': user['email']
}
})
except OAuthExchangeError as e:
app.logger.error(f'OAuth exchange failed: {e}')
return jsonify({'error': 'Authentication failed'}), 401
except JWTVerificationError as e:
app.logger.error(f'JWT verification failed: {e}')
return jsonify({'error': 'Authentication failed'}), 401
except Exception as e:
app.logger.error(f'Unexpected error: {e}')
return jsonify({'error': 'Internal server error'}), 500Django
# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import json
import os
import jwt
from datetime import datetime, timedelta
@csrf_exempt
@require_http_methods(["POST"])
def auth_callback(request):
try:
data = json.loads(request.body)
code = data.get('code')
if not code:
return JsonResponse({'error': 'Missing authorization code'}, status=400)
# Exchange the authorization code for verified claims
claims = exchange_code_for_claims(
code=code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY')
)
# Find or create user
user = find_or_create_user(
hawcx_id=claims['sub'],
email=claims['email']
)
# Create session token
session_token = generate_session_token(user)
return JsonResponse({
'success': True,
'sessionToken': session_token,
'user': {
'id': user['id'],
'email': user['email']
}
})
except OAuthExchangeError as e:
return JsonResponse({'error': 'Authentication failed'}, status=401)FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import os
import jwt
app = FastAPI()
class AuthCallbackRequest(BaseModel):
code: str
@app.post("/api/auth/hawcx/callback")
async def auth_callback(request: AuthCallbackRequest):
try:
if not request.code:
raise HTTPException(status_code=400, detail="Missing authorization code")
# Exchange the authorization code for verified claims
claims = exchange_code_for_claims(
code=request.code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY')
)
# Find or create user
user = find_or_create_user(
hawcx_id=claims['sub'],
email=claims['email']
)
# Create session token
session_token = generate_session_token(user)
return {
'success': True,
'sessionToken': session_token,
'user': {
'id': user['id'],
'email': user['email']
}
}
except OAuthExchangeError:
raise HTTPException(status_code=401, detail="Authentication failed")Step 4: Integrate with Your User Management
Create a user service module:
# services/user_service.py
from datetime import datetime
import os
import jwt
from typing import Dict, Any, Optional
def find_or_create_user(hawcx_id: str, email: str, email_verified: bool = False) -> Dict[str, Any]:
"""Find existing user or create new one based on Hawcx claims"""
# Import your database module
from app.db import db
# Check if user exists
user = db.session.query(User).filter_by(hawcx_id=hawcx_id).first()
if user:
# Update email_verified if needed
if email_verified and not user.email_verified:
user.email_verified = True
db.session.commit()
return user.to_dict()
# Create new user
new_user = User(
hawcx_id=hawcx_id,
email=email,
email_verified=email_verified,
created_at=datetime.utcnow(),
last_login=datetime.utcnow()
)
db.session.add(new_user)
db.session.commit()
return new_user.to_dict()
def generate_session_token(user: Dict[str, Any]) -> str:
"""Generate JWT session token for user"""
return jwt.encode(
{
'userId': str(user['id']),
'email': user['email'],
'hawcxId': user['hawcx_id'],
'exp': datetime.utcnow() + timedelta(days=7)
},
os.getenv('SESSION_SECRET'),
algorithm='HS256'
)
def verify_session_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify and decode session token"""
try:
return jwt.decode(
token,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
except jwt.InvalidTokenError:
return NoneStep 5: Add Authentication Middleware/Decorators
Flask
# middleware/auth.py
from functools import wraps
from flask import request, jsonify
from hawcx_oauth_client import verify_jwt
import os
def require_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({'error': 'Missing authorization token'}), 401
try:
payload = verify_jwt(
token,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
request.user = payload
return f(*args, **kwargs)
except Exception as e:
return jsonify({'error': 'Invalid or expired token'}), 401
return decorated_function
# Usage
@app.route('/api/me', methods=['GET'])
@require_auth
def get_current_user():
return jsonify(request.user)FastAPI
# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthCredentials
import jwt
import os
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthCredentials = Depends(security)):
try:
payload = jwt.decode(
credentials.credentials,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
return payload
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
# Usage
@app.get("/api/me")
async def get_current_user_endpoint(user = Depends(get_current_user)):
return userDjango
# decorators.py
from functools import wraps
from django.http import JsonResponse
import jwt
import os
def require_auth(view_func):
@wraps(view_func)
def wrapped_view(request, *args, **kwargs):
auth_header = request.headers.get('Authorization', '')
token = auth_header.replace('Bearer ', '') if auth_header else None
if not token:
return JsonResponse({'error': 'Missing authorization token'}, status=401)
try:
payload = jwt.decode(
token,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
request.user = payload
return view_func(request, *args, **kwargs)
except jwt.InvalidTokenError:
return JsonResponse({'error': 'Invalid or expired token'}, status=401)
return wrapped_view
# Usage
@require_auth
def get_current_user(request):
return JsonResponse(request.user)Step 6: Set Up MFA (Optional)
Create an MFA service:
# services/mfa_service.py
from hawcx_oauth_client.delegation import HawcxDelegationClient, MfaMethod
import os
class MFAService:
def __init__(self):
self.client = HawcxDelegationClient.from_keys(
sp_signing_key=os.getenv('SP_ED25519_PRIVATE_KEY_PEM'),
sp_encryption_key=os.getenv('SP_X25519_PRIVATE_KEY_PEM'),
idp_verify_key=os.getenv('IDP_ED25519_PUBLIC_KEY_PEM'),
idp_encryption_key=os.getenv('IDP_X25519_PUBLIC_KEY_PEM'),
sp_id=os.getenv('OAUTH_CLIENT_ID')
)
def initiate_mfa_setup(self, user_email: str, mfa_method: str, phone_number: str = None):
"""Initiate MFA setup for a user"""
try:
result = self.client.initiate_mfa_change(
userid=user_email,
mfa_method=MfaMethod(mfa_method),
phone_number=phone_number
)
return result
except Exception as e:
raise Exception(f'Failed to initiate MFA: {str(e)}')
def verify_mfa_code(self, user_email: str, session_id: str, otp: str):
"""Verify MFA code"""
try:
result = self.client.verify_mfa_change(
userid=user_email,
session_id=session_id,
otp=otp
)
return result
except Exception as e:
raise Exception(f'Failed to verify MFA: {str(e)}')
def get_user_mfa_status(self, user_email: str):
"""Get user's current MFA settings"""
try:
creds = self.client.get_user_credentials(user_email)
return {
'mfaEnabled': creds.get('mfa_method') != 'none',
'mfaMethod': creds.get('mfa_method')
}
except Exception as e:
raise Exception(f'Failed to fetch MFA status: {str(e)}')Create MFA endpoints:
# Flask example
from flask import jsonify
from services.mfa_service import MFAService
from middleware.auth import require_auth
mfa_service = MFAService()
@app.route('/api/mfa/initiate', methods=['POST'])
@require_auth
def initiate_mfa():
try:
data = request.get_json()
user_email = request.user.get('email')
result = mfa_service.initiate_mfa_setup(
user_email=user_email,
mfa_method=data.get('mfaMethod'),
phone_number=data.get('phoneNumber')
)
# Store session ID in cache with TTL
cache.set(f'mfa_session:{user_email}', result['session_id'], 600)
return jsonify({
'success': True,
'message': 'MFA setup initiated. Check your email/SMS for verification code.'
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/mfa/verify', methods=['POST'])
@require_auth
def verify_mfa():
try:
data = request.get_json()
user_email = request.user.get('email')
session_id = cache.get(f'mfa_session:{user_email}')
if not session_id:
return jsonify({'error': 'MFA session expired. Please try again.'}), 400
result = mfa_service.verify_mfa_code(
user_email=user_email,
session_id=session_id,
otp=data.get('otp')
)
if result['status'] != 'success':
return jsonify({'error': 'Invalid verification code'}), 400
cache.delete(f'mfa_session:{user_email}')
return jsonify({
'success': True,
'message': 'MFA setup completed successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/mfa/status', methods=['GET'])
@require_auth
def get_mfa_status():
try:
user_email = request.user.get('email')
status = mfa_service.get_user_mfa_status(user_email)
return jsonify(status)
except Exception as e:
return jsonify({'error': str(e)}), 500Step 7: Error Handling
Create a centralized error handler:
# utils/errors.py
from hawcx_oauth_client import (
OAuthExchangeError,
JWTVerificationError,
InvalidPublicKeyError
)
def handle_hawcx_error(error: Exception):
"""Convert Hawcx errors to user-friendly messages"""
if isinstance(error, OAuthExchangeError):
return {
'status': 401,
'message': 'The authorization code is invalid or expired'
}
if isinstance(error, JWTVerificationError):
return {
'status': 401,
'message': 'The authentication token could not be verified'
}
if isinstance(error, InvalidPublicKeyError):
return {
'status': 500,
'message': 'Server configuration error'
}
return {
'status': 500,
'message': 'An unexpected error occurred'
}Complete Example
Here's a minimal Flask example:
# app.py
from flask import Flask, request, jsonify
from hawcx_oauth_client import exchange_code_for_claims
import os
import jwt
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')
@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
try:
data = request.get_json()
code = data.get('code')
claims = exchange_code_for_claims(
code=code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY')
)
session_token = jwt.encode(
{
'userId': claims['sub'],
'email': claims['email'],
'exp': datetime.utcnow() + timedelta(days=7)
},
os.getenv('SESSION_SECRET'),
algorithm='HS256'
)
return jsonify({'sessionToken': session_token})
except Exception as e:
return jsonify({'error': 'Authentication failed'}), 401
@app.route('/api/me', methods=['GET'])
def get_me():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
payload = jwt.decode(token, os.getenv('SESSION_SECRET'), algorithms=['HS256'])
return jsonify(payload)
except:
return jsonify({'error': 'Unauthorized'}), 401
if __name__ == '__main__':
app.run(debug=False, port=5000)Next Steps
- Review the complete API reference for all available methods
- Explore MFA features for advanced user management
- Check out the demo app for a full working example