245 lines
6.4 KiB
Python
245 lines
6.4 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Generate Service-to-Service Authentication Token
|
|||
|
|
|
|||
|
|
This script generates JWT tokens for service-to-service communication
|
|||
|
|
in the Bakery-IA tenant deletion system.
|
|||
|
|
|
|||
|
|
Usage:
|
|||
|
|
python scripts/generate_service_token.py <service_name> [--days DAYS]
|
|||
|
|
|
|||
|
|
Examples:
|
|||
|
|
# Generate token for orchestrator (1 year expiration)
|
|||
|
|
python scripts/generate_service_token.py tenant-deletion-orchestrator
|
|||
|
|
|
|||
|
|
# Generate token for specific service with custom expiration
|
|||
|
|
python scripts/generate_service_token.py auth-service --days 90
|
|||
|
|
|
|||
|
|
# Generate tokens for all services
|
|||
|
|
python scripts/generate_service_token.py --all
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import sys
|
|||
|
|
import os
|
|||
|
|
import argparse
|
|||
|
|
from datetime import timedelta
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
# Add project root to path
|
|||
|
|
project_root = Path(__file__).parent.parent
|
|||
|
|
sys.path.insert(0, str(project_root))
|
|||
|
|
|
|||
|
|
from shared.auth.jwt_handler import JWTHandler
|
|||
|
|
|
|||
|
|
# Get JWT secret from environment (same as services use)
|
|||
|
|
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production-min-32-chars")
|
|||
|
|
|
|||
|
|
# Service names used in the system
|
|||
|
|
SERVICES = [
|
|||
|
|
"tenant-deletion-orchestrator",
|
|||
|
|
"auth-service",
|
|||
|
|
"tenant-service",
|
|||
|
|
"orders-service",
|
|||
|
|
"inventory-service",
|
|||
|
|
"recipes-service",
|
|||
|
|
"sales-service",
|
|||
|
|
"production-service",
|
|||
|
|
"suppliers-service",
|
|||
|
|
"pos-service",
|
|||
|
|
"external-service",
|
|||
|
|
"forecasting-service",
|
|||
|
|
"training-service",
|
|||
|
|
"alert-processor-service",
|
|||
|
|
"notification-service"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def generate_token(service_name: str, days: int = 365) -> str:
|
|||
|
|
"""
|
|||
|
|
Generate a service token
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
service_name: Name of the service
|
|||
|
|
days: Token expiration in days (default: 365)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
JWT service token
|
|||
|
|
"""
|
|||
|
|
jwt_handler = JWTHandler(
|
|||
|
|
secret_key=JWT_SECRET_KEY,
|
|||
|
|
algorithm="HS256"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
token = jwt_handler.create_service_token(
|
|||
|
|
service_name=service_name,
|
|||
|
|
expires_delta=timedelta(days=days)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return token
|
|||
|
|
|
|||
|
|
|
|||
|
|
def verify_token(token: str) -> dict:
|
|||
|
|
"""
|
|||
|
|
Verify a service token and return its payload
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
token: JWT token to verify
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Token payload dictionary
|
|||
|
|
"""
|
|||
|
|
jwt_handler = JWTHandler(
|
|||
|
|
secret_key=JWT_SECRET_KEY,
|
|||
|
|
algorithm="HS256"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
payload = jwt_handler.verify_token(token)
|
|||
|
|
if not payload:
|
|||
|
|
raise ValueError("Invalid or expired token")
|
|||
|
|
|
|||
|
|
return payload
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
parser = argparse.ArgumentParser(
|
|||
|
|
description="Generate service-to-service authentication tokens",
|
|||
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|||
|
|
epilog="""
|
|||
|
|
Examples:
|
|||
|
|
# Generate token for orchestrator
|
|||
|
|
%(prog)s tenant-deletion-orchestrator
|
|||
|
|
|
|||
|
|
# Generate token with custom expiration
|
|||
|
|
%(prog)s auth-service --days 90
|
|||
|
|
|
|||
|
|
# Generate tokens for all services
|
|||
|
|
%(prog)s --all
|
|||
|
|
|
|||
|
|
# Verify a token
|
|||
|
|
%(prog)s --verify <token>
|
|||
|
|
"""
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"service_name",
|
|||
|
|
nargs="?",
|
|||
|
|
help="Name of the service (e.g., 'tenant-deletion-orchestrator')"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--days",
|
|||
|
|
type=int,
|
|||
|
|
default=365,
|
|||
|
|
help="Token expiration in days (default: 365)"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--all",
|
|||
|
|
action="store_true",
|
|||
|
|
help="Generate tokens for all services"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--verify",
|
|||
|
|
metavar="TOKEN",
|
|||
|
|
help="Verify a token and show its payload"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--list-services",
|
|||
|
|
action="store_true",
|
|||
|
|
help="List all available service names"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
args = parser.parse_args()
|
|||
|
|
|
|||
|
|
# List services
|
|||
|
|
if args.list_services:
|
|||
|
|
print("\nAvailable Services:")
|
|||
|
|
print("=" * 50)
|
|||
|
|
for service in SERVICES:
|
|||
|
|
print(f" - {service}")
|
|||
|
|
print()
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
# Verify token
|
|||
|
|
if args.verify:
|
|||
|
|
try:
|
|||
|
|
payload = verify_token(args.verify)
|
|||
|
|
print("\n✓ Token is valid!")
|
|||
|
|
print("=" * 50)
|
|||
|
|
print(f"Service Name: {payload.get('service')}")
|
|||
|
|
print(f"Type: {payload.get('type')}")
|
|||
|
|
print(f"Is Service: {payload.get('is_service')}")
|
|||
|
|
print(f"Role: {payload.get('role')}")
|
|||
|
|
print(f"Issued At: {payload.get('iat')}")
|
|||
|
|
print(f"Expires At: {payload.get('exp')}")
|
|||
|
|
print("=" * 50)
|
|||
|
|
print()
|
|||
|
|
return 0
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n✗ Token verification failed: {e}\n")
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
# Generate for all services
|
|||
|
|
if args.all:
|
|||
|
|
print(f"\nGenerating service tokens (expires in {args.days} days)...")
|
|||
|
|
print("=" * 80)
|
|||
|
|
|
|||
|
|
for service in SERVICES:
|
|||
|
|
try:
|
|||
|
|
token = generate_token(service, args.days)
|
|||
|
|
print(f"\n{service}:")
|
|||
|
|
print(f" export {service.upper().replace('-', '_')}_TOKEN='{token}'")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n✗ Failed to generate token for {service}: {e}")
|
|||
|
|
|
|||
|
|
print("\n" + "=" * 80)
|
|||
|
|
print("\nℹ Copy the export statements above to set environment variables")
|
|||
|
|
print("ℹ Or save them to a .env file for your services\n")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
# Generate for single service
|
|||
|
|
if not args.service_name:
|
|||
|
|
parser.print_help()
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
print(f"\nGenerating service token for: {args.service_name}")
|
|||
|
|
print(f"Expiration: {args.days} days")
|
|||
|
|
print("=" * 80)
|
|||
|
|
|
|||
|
|
token = generate_token(args.service_name, args.days)
|
|||
|
|
|
|||
|
|
print("\n✓ Token generated successfully!\n")
|
|||
|
|
print("Token:")
|
|||
|
|
print(f" {token}")
|
|||
|
|
print()
|
|||
|
|
print("Environment Variable:")
|
|||
|
|
env_var = args.service_name.upper().replace('-', '_') + '_TOKEN'
|
|||
|
|
print(f" export {env_var}='{token}'")
|
|||
|
|
print()
|
|||
|
|
print("Usage in Code:")
|
|||
|
|
print(f" headers = {{'Authorization': f'Bearer {{os.getenv(\"{env_var}\")}}'}}")
|
|||
|
|
print()
|
|||
|
|
print("Test with curl:")
|
|||
|
|
print(f" curl -H 'Authorization: Bearer {token}' https://localhost/api/v1/...")
|
|||
|
|
print()
|
|||
|
|
print("=" * 80)
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# Verify the token we just created
|
|||
|
|
print("Verifying token...")
|
|||
|
|
payload = verify_token(token)
|
|||
|
|
print("✓ Token is valid and verified!\n")
|
|||
|
|
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n✗ Error: {e}\n")
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
sys.exit(main())
|