Integrating Hawcx Backend SDK into Existing Python Projects
This guide walks through adding Hawcx OAuth authentication to an existing Python backend.Step 1: Install the SDK
Copy
pip install hawcx-oauth-client
# Or with Flask support
pip install "hawcx-oauth-client[flask]"
Step 2: Set Up Environment Variables
Create a.env file or configure your environment with:
Copy
# OAuth Configuration (required)
OAUTH_TOKEN_ENDPOINT=https://oauth.hawcx.com/token
OAUTH_CLIENT_ID=your_client_id
OAUTH_PUBLIC_KEY=-----BEGIN PUBLIC KEY-----\n...
# Optional but recommended
OAUTH_ISSUER=https://oauth.hawcx.com
OAUTH_AUDIENCE=your_client_id
# For MFA features (if using delegation client)
SP_ED25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
SP_X25519_PRIVATE_KEY_PEM=-----BEGIN PRIVATE KEY-----\n...
IDP_ED25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...
IDP_X25519_PUBLIC_KEY_PEM=-----BEGIN PUBLIC KEY-----\n...
Copy
import os
from dotenv import load_dotenv
load_dotenv()
Step 3: Create an Authentication Endpoint
Flask
Copy
from flask import Flask, request, jsonify, session
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError, JWTVerificationError
import os
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')
@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
try:
data = request.get_json() or {}
code = data.get('code')
if not code:
return jsonify({'error': 'Missing authorization code'}), 400
# Exchange the authorization code for verified claims
claims = exchange_code_for_claims(
code=code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY'),
audience=os.getenv('OAUTH_AUDIENCE') or os.getenv('OAUTH_CLIENT_ID'),
issuer=os.getenv('OAUTH_ISSUER'),
leeway=10
)
# Find or create user in your database
user = find_or_create_user(
hawcx_id=claims['sub'],
email=claims['email'],
email_verified=claims.get('email_verified', False)
)
# Create your application's session/JWT
session_token = generate_session_token(user)
return jsonify({
'success': True,
'sessionToken': session_token,
'user': {
'id': user['id'],
'email': user['email']
}
})
except OAuthExchangeError as e:
app.logger.error(f'OAuth exchange failed: {e}')
return jsonify({'error': 'Authentication failed'}), 401
except JWTVerificationError as e:
app.logger.error(f'JWT verification failed: {e}')
return jsonify({'error': 'Authentication failed'}), 401
except Exception as e:
app.logger.error(f'Unexpected error: {e}')
return jsonify({'error': 'Internal server error'}), 500
Django
Copy
# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import json
import os
import jwt
from datetime import datetime, timedelta
@csrf_exempt
@require_http_methods(["POST"])
def auth_callback(request):
try:
data = json.loads(request.body)
code = data.get('code')
if not code:
return JsonResponse({'error': 'Missing authorization code'}, status=400)
# Exchange the authorization code for verified claims
claims = exchange_code_for_claims(
code=code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY')
)
# Find or create user
user = find_or_create_user(
hawcx_id=claims['sub'],
email=claims['email']
)
# Create session token
session_token = generate_session_token(user)
return JsonResponse({
'success': True,
'sessionToken': session_token,
'user': {
'id': user['id'],
'email': user['email']
}
})
except OAuthExchangeError as e:
return JsonResponse({'error': 'Authentication failed'}, status=401)
FastAPI
Copy
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from hawcx_oauth_client import exchange_code_for_claims, OAuthExchangeError
import os
import jwt
app = FastAPI()
class AuthCallbackRequest(BaseModel):
code: str
@app.post("/api/auth/hawcx/callback")
async def auth_callback(request: AuthCallbackRequest):
try:
if not request.code:
raise HTTPException(status_code=400, detail="Missing authorization code")
# Exchange the authorization code for verified claims
claims = exchange_code_for_claims(
code=request.code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY')
)
# Find or create user
user = find_or_create_user(
hawcx_id=claims['sub'],
email=claims['email']
)
# Create session token
session_token = generate_session_token(user)
return {
'success': True,
'sessionToken': session_token,
'user': {
'id': user['id'],
'email': user['email']
}
}
except OAuthExchangeError:
raise HTTPException(status_code=401, detail="Authentication failed")
Step 4: Integrate with Your User Management
Create a user service module:Copy
# services/user_service.py
from datetime import datetime
import os
import jwt
from typing import Dict, Any, Optional
def find_or_create_user(hawcx_id: str, email: str, email_verified: bool = False) -> Dict[str, Any]:
"""Find existing user or create new one based on Hawcx claims"""
# Import your database module
from app.db import db
# Check if user exists
user = db.session.query(User).filter_by(hawcx_id=hawcx_id).first()
if user:
# Update email_verified if needed
if email_verified and not user.email_verified:
user.email_verified = True
db.session.commit()
return user.to_dict()
# Create new user
new_user = User(
hawcx_id=hawcx_id,
email=email,
email_verified=email_verified,
created_at=datetime.utcnow(),
last_login=datetime.utcnow()
)
db.session.add(new_user)
db.session.commit()
return new_user.to_dict()
def generate_session_token(user: Dict[str, Any]) -> str:
"""Generate JWT session token for user"""
return jwt.encode(
{
'userId': str(user['id']),
'email': user['email'],
'hawcxId': user['hawcx_id'],
'exp': datetime.utcnow() + timedelta(days=7)
},
os.getenv('SESSION_SECRET'),
algorithm='HS256'
)
def verify_session_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify and decode session token"""
try:
return jwt.decode(
token,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
except jwt.InvalidTokenError:
return None
Step 5: Add Authentication Middleware/Decorators
Flask
Copy
# middleware/auth.py
from functools import wraps
from flask import request, jsonify
from hawcx_oauth_client import verify_jwt
import os
def require_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({'error': 'Missing authorization token'}), 401
try:
payload = verify_jwt(
token,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
request.user = payload
return f(*args, **kwargs)
except Exception as e:
return jsonify({'error': 'Invalid or expired token'}), 401
return decorated_function
# Usage
@app.route('/api/me', methods=['GET'])
@require_auth
def get_current_user():
return jsonify(request.user)
FastAPI
Copy
# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthCredentials
import jwt
import os
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthCredentials = Depends(security)):
try:
payload = jwt.decode(
credentials.credentials,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
return payload
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
# Usage
@app.get("/api/me")
async def get_current_user_endpoint(user = Depends(get_current_user)):
return user
Django
Copy
# decorators.py
from functools import wraps
from django.http import JsonResponse
import jwt
import os
def require_auth(view_func):
@wraps(view_func)
def wrapped_view(request, *args, **kwargs):
auth_header = request.headers.get('Authorization', '')
token = auth_header.replace('Bearer ', '') if auth_header else None
if not token:
return JsonResponse({'error': 'Missing authorization token'}, status=401)
try:
payload = jwt.decode(
token,
os.getenv('SESSION_SECRET'),
algorithms=['HS256']
)
request.user = payload
return view_func(request, *args, **kwargs)
except jwt.InvalidTokenError:
return JsonResponse({'error': 'Invalid or expired token'}, status=401)
return wrapped_view
# Usage
@require_auth
def get_current_user(request):
return JsonResponse(request.user)
Step 6: Set Up MFA (Optional)
Create an MFA service:Copy
# services/mfa_service.py
from hawcx_oauth_client.delegation import HawcxDelegationClient, MfaMethod
import os
class MFAService:
def __init__(self):
self.client = HawcxDelegationClient.from_keys(
sp_signing_key=os.getenv('SP_ED25519_PRIVATE_KEY_PEM'),
sp_encryption_key=os.getenv('SP_X25519_PRIVATE_KEY_PEM'),
idp_verify_key=os.getenv('IDP_ED25519_PUBLIC_KEY_PEM'),
idp_encryption_key=os.getenv('IDP_X25519_PUBLIC_KEY_PEM'),
sp_id=os.getenv('OAUTH_CLIENT_ID')
)
def initiate_mfa_setup(self, user_email: str, mfa_method: str, phone_number: str = None):
"""Initiate MFA setup for a user"""
try:
result = self.client.initiate_mfa_change(
userid=user_email,
mfa_method=MfaMethod(mfa_method),
phone_number=phone_number
)
return result
except Exception as e:
raise Exception(f'Failed to initiate MFA: {str(e)}')
def verify_mfa_code(self, user_email: str, session_id: str, otp: str):
"""Verify MFA code"""
try:
result = self.client.verify_mfa_change(
userid=user_email,
session_id=session_id,
otp=otp
)
return result
except Exception as e:
raise Exception(f'Failed to verify MFA: {str(e)}')
def get_user_mfa_status(self, user_email: str):
"""Get user's current MFA settings"""
try:
creds = self.client.get_user_credentials(user_email)
return {
'mfaEnabled': creds.get('mfa_method') != 'none',
'mfaMethod': creds.get('mfa_method')
}
except Exception as e:
raise Exception(f'Failed to fetch MFA status: {str(e)}')
Copy
# Flask example
from flask import jsonify
from services.mfa_service import MFAService
from middleware.auth import require_auth
mfa_service = MFAService()
@app.route('/api/mfa/initiate', methods=['POST'])
@require_auth
def initiate_mfa():
try:
data = request.get_json()
user_email = request.user.get('email')
result = mfa_service.initiate_mfa_setup(
user_email=user_email,
mfa_method=data.get('mfaMethod'),
phone_number=data.get('phoneNumber')
)
# Store session ID in cache with TTL
cache.set(f'mfa_session:{user_email}', result['session_id'], 600)
return jsonify({
'success': True,
'message': 'MFA setup initiated. Check your email/SMS for verification code.'
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/mfa/verify', methods=['POST'])
@require_auth
def verify_mfa():
try:
data = request.get_json()
user_email = request.user.get('email')
session_id = cache.get(f'mfa_session:{user_email}')
if not session_id:
return jsonify({'error': 'MFA session expired. Please try again.'}), 400
result = mfa_service.verify_mfa_code(
user_email=user_email,
session_id=session_id,
otp=data.get('otp')
)
if result['status'] != 'success':
return jsonify({'error': 'Invalid verification code'}), 400
cache.delete(f'mfa_session:{user_email}')
return jsonify({
'success': True,
'message': 'MFA setup completed successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/mfa/status', methods=['GET'])
@require_auth
def get_mfa_status():
try:
user_email = request.user.get('email')
status = mfa_service.get_user_mfa_status(user_email)
return jsonify(status)
except Exception as e:
return jsonify({'error': str(e)}), 500
Step 7: Error Handling
Create a centralized error handler:Copy
# utils/errors.py
from hawcx_oauth_client import (
OAuthExchangeError,
JWTVerificationError,
InvalidPublicKeyError
)
def handle_hawcx_error(error: Exception):
"""Convert Hawcx errors to user-friendly messages"""
if isinstance(error, OAuthExchangeError):
return {
'status': 401,
'message': 'The authorization code is invalid or expired'
}
if isinstance(error, JWTVerificationError):
return {
'status': 401,
'message': 'The authentication token could not be verified'
}
if isinstance(error, InvalidPublicKeyError):
return {
'status': 500,
'message': 'Server configuration error'
}
return {
'status': 500,
'message': 'An unexpected error occurred'
}
Complete Example
Here’s a minimal Flask example:Copy
# app.py
from flask import Flask, request, jsonify
from hawcx_oauth_client import exchange_code_for_claims
import os
import jwt
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY')
@app.route('/api/auth/hawcx/callback', methods=['POST'])
def auth_callback():
try:
data = request.get_json()
code = data.get('code')
claims = exchange_code_for_claims(
code=code,
oauth_token_url=os.getenv('OAUTH_TOKEN_ENDPOINT'),
client_id=os.getenv('OAUTH_CLIENT_ID'),
public_key=os.getenv('OAUTH_PUBLIC_KEY')
)
session_token = jwt.encode(
{
'userId': claims['sub'],
'email': claims['email'],
'exp': datetime.utcnow() + timedelta(days=7)
},
os.getenv('SESSION_SECRET'),
algorithm='HS256'
)
return jsonify({'sessionToken': session_token})
except Exception as e:
return jsonify({'error': 'Authentication failed'}), 401
@app.route('/api/me', methods=['GET'])
def get_me():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
payload = jwt.decode(token, os.getenv('SESSION_SECRET'), algorithms=['HS256'])
return jsonify(payload)
except:
return jsonify({'error': 'Unauthorized'}), 401
if __name__ == '__main__':
app.run(debug=False, port=5000)
Next Steps
- Review the complete API reference for all available methods
- Explore MFA features for advanced user management
- Check out the demo app for a full working example
