Files

175 lines
5.8 KiB
Python
Raw Permalink Normal View History

2025-11-18 07:17:17 +01:00
# ================================================================
# services/forecasting/app/api/webhooks.py
# ================================================================
"""
Webhooks API - Receive events from other services
"""
from fastapi import APIRouter, HTTPException, status, Header
from typing import Dict, Any, Optional
from uuid import UUID
from datetime import date
import structlog
from pydantic import BaseModel, Field
from app.jobs.sales_data_listener import (
handle_sales_import_completion,
handle_pos_sync_completion
)
from shared.routing import RouteBuilder
route_builder = RouteBuilder('forecasting')
router = APIRouter(tags=["webhooks"])
logger = structlog.get_logger()
# ================================================================
# Request Schemas
# ================================================================
class SalesImportWebhook(BaseModel):
"""Webhook payload for sales data import completion"""
tenant_id: UUID = Field(..., description="Tenant ID")
import_job_id: str = Field(..., description="Import job ID")
start_date: date = Field(..., description="Start date of imported data")
end_date: date = Field(..., description="End date of imported data")
records_count: int = Field(..., ge=0, description="Number of records imported")
import_source: str = Field(default="import", description="Source of import")
class POSSyncWebhook(BaseModel):
"""Webhook payload for POS sync completion"""
tenant_id: UUID = Field(..., description="Tenant ID")
sync_log_id: str = Field(..., description="POS sync log ID")
sync_date: date = Field(..., description="Date of synced data")
records_synced: int = Field(..., ge=0, description="Number of records synced")
# ================================================================
# Endpoints
# ================================================================
@router.post(
"/webhooks/sales-import-completed",
status_code=status.HTTP_202_ACCEPTED
)
async def sales_import_completed_webhook(
payload: SalesImportWebhook,
x_webhook_signature: Optional[str] = Header(None, description="Webhook signature for verification")
):
"""
Webhook endpoint for sales data import completion
Called by the sales service when a data import completes.
Triggers validation backfill for the imported date range.
Note: In production, this should verify the webhook signature
to ensure the request comes from a trusted source.
"""
try:
logger.info(
"Received sales import completion webhook",
tenant_id=payload.tenant_id,
import_job_id=payload.import_job_id,
date_range=f"{payload.start_date} to {payload.end_date}"
)
# In production, verify webhook signature here
# if not verify_webhook_signature(x_webhook_signature, payload):
# raise HTTPException(status_code=401, detail="Invalid webhook signature")
# Handle the import completion asynchronously
result = await handle_sales_import_completion(
tenant_id=payload.tenant_id,
import_job_id=payload.import_job_id,
start_date=payload.start_date,
end_date=payload.end_date,
records_count=payload.records_count,
import_source=payload.import_source
)
return {
"status": "accepted",
"message": "Sales import completion event received and processing",
"result": result
}
except Exception as e:
logger.error(
"Failed to process sales import webhook",
payload=payload.dict(),
error=str(e)
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to process webhook: {str(e)}"
)
@router.post(
"/webhooks/pos-sync-completed",
status_code=status.HTTP_202_ACCEPTED
)
async def pos_sync_completed_webhook(
payload: POSSyncWebhook,
x_webhook_signature: Optional[str] = Header(None, description="Webhook signature for verification")
):
"""
Webhook endpoint for POS sync completion
Called by the POS service when data synchronization completes.
Triggers validation for the synced date.
"""
try:
logger.info(
"Received POS sync completion webhook",
tenant_id=payload.tenant_id,
sync_log_id=payload.sync_log_id,
sync_date=payload.sync_date.isoformat()
)
# In production, verify webhook signature here
# if not verify_webhook_signature(x_webhook_signature, payload):
# raise HTTPException(status_code=401, detail="Invalid webhook signature")
# Handle the sync completion
result = await handle_pos_sync_completion(
tenant_id=payload.tenant_id,
sync_log_id=payload.sync_log_id,
sync_date=payload.sync_date,
records_synced=payload.records_synced
)
return {
"status": "accepted",
"message": "POS sync completion event received and processing",
"result": result
}
except Exception as e:
logger.error(
"Failed to process POS sync webhook",
payload=payload.dict(),
error=str(e)
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to process webhook: {str(e)}"
)
@router.get(
"/webhooks/health",
status_code=status.HTTP_200_OK
)
async def webhook_health_check():
"""Health check endpoint for webhook receiver"""
return {
"status": "healthy",
"service": "forecasting-webhooks",
"endpoints": [
"/webhooks/sales-import-completed",
"/webhooks/pos-sync-completed"
]
}