Files
bakery-ia/services/production/app/services/production_service.py
2025-12-15 13:39:33 +01:00

2265 lines
96 KiB
Python

"""
Production Service
Main business logic for production operations
"""
from typing import Optional, List, Dict, Any
from datetime import datetime, date, timedelta, timezone
from uuid import UUID
import structlog
from shared.database.transactions import transactional
from shared.clients import get_inventory_client, get_sales_client
from shared.clients.orders_client import OrdersServiceClient
from shared.clients.recipes_client import RecipesServiceClient
from shared.config.base import BaseServiceSettings
from app.repositories.production_batch_repository import ProductionBatchRepository
from app.repositories.production_schedule_repository import ProductionScheduleRepository
from app.repositories.production_capacity_repository import ProductionCapacityRepository
from app.repositories.quality_check_repository import QualityCheckRepository
from app.models.production import ProductionBatch, ProductionSchedule, ProductionStatus, ProductionPriority
from app.schemas.production import (
ProductionBatchCreate, ProductionBatchUpdate, ProductionBatchStatusUpdate,
ProductionScheduleCreate, ProductionScheduleUpdate, ProductionScheduleResponse,
DailyProductionRequirements, ProductionDashboardSummary, ProductionMetrics
)
from app.utils.cache import delete_cached, make_cache_key
from app.services.production_notification_service import ProductionNotificationService
logger = structlog.get_logger()
class ProductionService:
"""Main production service with business logic"""
def __init__(
self,
database_manager,
config: BaseServiceSettings,
notification_service: Optional[ProductionNotificationService] = None
):
self.database_manager = database_manager
self.config = config
self.notification_service = notification_service
# Initialize shared clients
self.inventory_client = get_inventory_client(config, "production")
self.orders_client = OrdersServiceClient(config)
self.recipes_client = RecipesServiceClient(config)
self.sales_client = get_sales_client(config, "production")
async def calculate_daily_requirements(
self,
tenant_id: UUID,
target_date: date
) -> DailyProductionRequirements:
"""Calculate production requirements using shared client pattern"""
try:
# 1. Get demand requirements from Orders Service
demand_data = await self.orders_client.get_demand_requirements(
str(tenant_id),
target_date.isoformat()
)
# 2. Get current stock levels from Inventory Service
stock_levels = await self.inventory_client.get_stock_levels(str(tenant_id))
# 3. Get recipe requirements from Recipes Service
recipe_data = await self.recipes_client.get_recipe_requirements(str(tenant_id))
# 4. Get capacity information
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
available_capacity = await self._calculate_available_capacity(
capacity_repo, tenant_id, target_date
)
# 5. Apply production planning business logic
production_plan = await self._calculate_production_plan(
tenant_id, target_date, demand_data, stock_levels, recipe_data, available_capacity
)
return production_plan
except Exception as e:
logger.error("Error calculating daily production requirements",
error=str(e), tenant_id=str(tenant_id), date=target_date.isoformat())
raise
async def create_production_batch(
self,
tenant_id: UUID,
batch_data: ProductionBatchCreate
) -> ProductionBatch:
"""Create a new production batch"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Prepare batch data
batch_dict = batch_data.model_dump()
batch_dict["tenant_id"] = tenant_id
# Validate recipe exists and get quality configuration
recipe_quality_config = None
if batch_data.recipe_id:
recipe_details = await self.recipes_client.get_recipe_by_id(
str(tenant_id), str(batch_data.recipe_id)
)
if not recipe_details:
raise ValueError(f"Recipe {batch_data.recipe_id} not found")
# Extract quality configuration from recipe
recipe_quality_config = recipe_details.get("quality_check_configuration")
# Check ingredient availability
if batch_data.recipe_id:
ingredient_requirements = await self.recipes_client.calculate_ingredients_for_quantity(
str(tenant_id), str(batch_data.recipe_id), batch_data.planned_quantity
)
if ingredient_requirements:
availability_check = await self.inventory_client.check_availability(
str(tenant_id), ingredient_requirements.get("requirements", [])
)
if not availability_check or not availability_check.get("all_available", True):
logger.warning("Insufficient ingredients for batch",
batch_data=batch_dict, availability=availability_check)
# Create the batch
batch = await batch_repo.create_batch(batch_dict)
# Inherit quality templates from recipe if configured
if recipe_quality_config and recipe_quality_config.get("auto_create_quality_checks", True):
await self._setup_batch_quality_checks(session, batch, recipe_quality_config, tenant_id)
logger.info("Production batch created with quality inheritance",
batch_id=str(batch.id), tenant_id=str(tenant_id),
has_quality_config=bool(recipe_quality_config))
return batch
except Exception as e:
logger.error("Error creating production batch",
error=str(e), tenant_id=str(tenant_id))
raise
async def _setup_batch_quality_checks(
self,
session,
batch: ProductionBatch,
quality_config: Dict[str, Any],
tenant_id: UUID
):
"""Set up quality checks for a production batch based on recipe configuration"""
try:
# Initialize pending and completed quality checks structures
pending_quality_checks = {}
completed_quality_checks = {}
# Process each stage configuration
stages = quality_config.get("stages", {})
for stage_name, stage_config in stages.items():
template_ids = stage_config.get("template_ids", [])
required_checks = stage_config.get("required_checks", [])
optional_checks = stage_config.get("optional_checks", [])
min_quality_score = stage_config.get("min_quality_score")
blocking_on_failure = stage_config.get("blocking_on_failure", True)
# Set up pending checks for this stage
if template_ids or required_checks or optional_checks:
pending_quality_checks[stage_name] = {
"template_ids": [str(tid) for tid in template_ids],
"required_checks": required_checks,
"optional_checks": optional_checks,
"min_quality_score": min_quality_score,
"blocking_on_failure": blocking_on_failure,
"status": "pending"
}
# Initialize completed structure for this stage
completed_quality_checks[stage_name] = {
"checks": [],
"overall_score": None,
"passed": None,
"completed_at": None
}
# Update batch with quality check configuration
batch.pending_quality_checks = pending_quality_checks
batch.completed_quality_checks = completed_quality_checks
# Save the updated batch
await session.commit()
logger.info("Quality checks setup completed for batch",
batch_id=str(batch.id),
stages_configured=list(stages.keys()),
total_templates=sum(len(stage.get("template_ids", [])) for stage in stages.values()))
except Exception as e:
logger.error("Error setting up batch quality checks",
error=str(e), batch_id=str(batch.id))
raise
async def update_batch_stage_with_quality_checks(
self,
tenant_id: UUID,
batch_id: UUID,
new_stage: str
) -> ProductionBatch:
"""Update batch stage and create required quality checks"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Get the batch
batch = await batch_repo.get_batch(tenant_id, batch_id)
if not batch:
raise ValueError(f"Batch {batch_id} not found")
# Update current stage
old_stage = batch.current_process_stage
batch.current_process_stage = new_stage
# Check if there are pending quality checks for this stage
pending_checks = batch.pending_quality_checks or {}
stage_checks = pending_checks.get(new_stage)
if stage_checks and stage_checks.get("template_ids"):
# Create quality check records from templates
from app.repositories.quality_template_repository import QualityTemplateRepository
template_repo = QualityTemplateRepository(session)
quality_repo = QualityCheckRepository(session)
template_ids = [UUID(tid) for tid in stage_checks["template_ids"]]
templates = await template_repo.get_templates_by_ids(str(tenant_id), template_ids)
# Create quality checks for each template
for template in templates:
quality_check_data = {
"tenant_id": tenant_id,
"batch_id": batch_id,
"template_id": template.id,
"check_type": template.check_type,
"process_stage": new_stage,
"check_time": datetime.utcnow(),
"quality_score": 0.0, # To be filled when check is performed
"pass_fail": False, # To be updated when check is performed
"defect_count": 0,
"target_weight": template.target_value,
"target_temperature": template.target_value if template.check_type == "temperature" else None,
"tolerance_percentage": template.tolerance_percentage
}
await quality_repo.create_quality_check(quality_check_data)
logger.info("Created quality checks for batch stage transition",
batch_id=str(batch_id),
stage=new_stage,
checks_created=len(templates))
# Save batch changes
await session.commit()
return batch
except Exception as e:
logger.error("Error updating batch stage with quality checks",
error=str(e), batch_id=str(batch_id), new_stage=new_stage)
raise
async def get_production_batches_list(
self,
tenant_id: UUID,
filters: Dict[str, Any],
page: int,
page_size: int
) -> Dict[str, Any]:
"""Get list of production batches with filtering and pagination"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Apply filters
filter_dict = {k: v for k, v in filters.items() if v is not None}
filter_dict["tenant_id"] = str(tenant_id)
# Get batches with pagination
batches = await batch_repo.get_batches_filtered(filter_dict, page, page_size)
total_count = await batch_repo.count_batches_filtered(filter_dict)
# Convert to response format
from app.schemas.production import ProductionBatchResponse, ProductionBatchListResponse
batch_responses = [ProductionBatchResponse.model_validate(batch) for batch in batches]
return ProductionBatchListResponse(
batches=batch_responses,
total_count=total_count,
page=page,
page_size=page_size
)
except Exception as e:
logger.error("Error getting production batches list",
error=str(e), tenant_id=str(tenant_id))
raise
async def update_batch_status(
self,
tenant_id: UUID,
batch_id: UUID,
status_update: ProductionBatchStatusUpdate
) -> ProductionBatch:
"""Update production batch status"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Get current batch to capture old status for notification
current_batch = await batch_repo.get_batch(tenant_id, batch_id)
old_status = current_batch.status.value if current_batch else None
# Update batch status
batch = await batch_repo.update_batch_status(
batch_id,
status_update.status,
status_update.actual_quantity,
status_update.notes
)
# Update inventory if batch is completed
if status_update.status == ProductionStatus.COMPLETED and status_update.actual_quantity:
await self._update_inventory_on_completion(
tenant_id, batch, status_update.actual_quantity
)
# PHASE 2: Invalidate production dashboard cache
cache_key = make_cache_key("production_dashboard", str(tenant_id))
await delete_cached(cache_key)
logger.debug("Invalidated production dashboard cache", cache_key=cache_key, tenant_id=str(tenant_id))
# Emit batch state changed notification
if self.notification_service and old_status:
try:
await self.notification_service.emit_batch_state_changed_notification(
tenant_id=tenant_id,
batch_id=str(batch.id),
product_sku=batch.product_sku or "",
product_name=batch.product_name or "Unknown Product",
old_status=old_status,
new_status=status_update.status.value,
quantity=batch.planned_quantity or 0,
unit=batch.unit or "units",
assigned_to=batch.assigned_to
)
except Exception as notif_error:
logger.warning("Failed to emit batch state notification",
error=str(notif_error), batch_id=str(batch_id))
logger.info("Updated batch status",
batch_id=str(batch_id),
new_status=status_update.status.value,
tenant_id=str(tenant_id))
return batch
except Exception as e:
logger.error("Error updating batch status",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
raise
async def get_dashboard_summary(self, tenant_id: UUID) -> ProductionDashboardSummary:
"""Get production dashboard summary data"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Get active batches
active_batches = await batch_repo.get_active_batches(str(tenant_id))
# Get today's production plan
today = date.today()
todays_batches = await batch_repo.get_batches_by_date_range(
str(tenant_id), today, today
)
# Calculate metrics
todays_plan = [
{
"product_name": batch.product_name,
"planned_quantity": batch.planned_quantity,
"status": batch.status.value,
"completion_time": batch.planned_end_time.isoformat() if batch.planned_end_time else None
}
for batch in todays_batches
]
# Get metrics for last 7 days
week_ago = today - timedelta(days=7)
weekly_metrics = await batch_repo.get_production_metrics(
str(tenant_id), week_ago, today
)
# Calculate capacity utilization from actual data
from app.models.production import QualityCheck
from sqlalchemy import select, func, and_
# Calculate capacity utilization: (Total planned quantity / Total capacity) * 100
# Assuming 8-hour workday with standard capacity per hour
STANDARD_HOURLY_CAPACITY = 100 # units per hour (configurable)
WORKING_HOURS_PER_DAY = 8
total_daily_capacity = STANDARD_HOURLY_CAPACITY * WORKING_HOURS_PER_DAY
total_planned_today = sum(b.planned_quantity or 0 for b in todays_batches)
capacity_utilization = min((total_planned_today / total_daily_capacity * 100) if total_daily_capacity > 0 else 0, 100)
# Calculate average quality score from quality checks
quality_query = select(func.avg(QualityCheck.quality_score)).where(
and_(
QualityCheck.tenant_id == tenant_id,
QualityCheck.check_time >= datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
)
)
quality_result = await batch_repo.session.execute(quality_query)
average_quality_score = quality_result.scalar() or 0.0
# If no checks today, use recent average (last 7 days)
if average_quality_score == 0:
recent_quality_query = select(func.avg(QualityCheck.quality_score)).where(
and_(
QualityCheck.tenant_id == tenant_id,
QualityCheck.check_time >= datetime.now(timezone.utc) - timedelta(days=7)
)
)
recent_quality_result = await batch_repo.session.execute(recent_quality_query)
average_quality_score = recent_quality_result.scalar() or 8.5 # Default fallback
return ProductionDashboardSummary(
active_batches=len(active_batches),
todays_production_plan=todays_plan,
capacity_utilization=round(capacity_utilization, 1),
on_time_completion_rate=weekly_metrics.get("on_time_completion_rate", 0),
average_quality_score=round(average_quality_score, 1),
total_output_today=sum(b.actual_quantity or 0 for b in todays_batches),
efficiency_percentage=weekly_metrics.get("average_yield_percentage", 0)
)
except Exception as e:
logger.error("Error getting dashboard summary",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_production_requirements(
self,
tenant_id: UUID,
target_date: Optional[date] = None
) -> Dict[str, Any]:
"""Get production requirements for procurement planning"""
try:
if not target_date:
target_date = date.today()
# Get planned batches for the date
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
planned_batches = await batch_repo.get_batches_by_date_range(
str(tenant_id), target_date, target_date, ProductionStatus.PENDING
)
# Calculate ingredient requirements
total_requirements = {}
for batch in planned_batches:
if batch.recipe_id:
requirements = await self.recipes_client.calculate_ingredients_for_quantity(
str(tenant_id), str(batch.recipe_id), batch.planned_quantity
)
if requirements and "requirements" in requirements:
for req in requirements["requirements"]:
ingredient_id = req.get("ingredient_id")
quantity = req.get("quantity", 0)
if ingredient_id in total_requirements:
total_requirements[ingredient_id]["quantity"] += quantity
else:
total_requirements[ingredient_id] = {
"ingredient_id": ingredient_id,
"ingredient_name": req.get("ingredient_name"),
"quantity": quantity,
"unit": req.get("unit"),
"priority": "medium"
}
return {
"date": target_date.isoformat(),
"total_batches": len(planned_batches),
"ingredient_requirements": list(total_requirements.values()),
"estimated_start_time": "06:00:00",
"estimated_duration_hours": sum(b.planned_duration_minutes for b in planned_batches) / 60
}
except Exception as e:
logger.error("Error getting production requirements",
error=str(e), tenant_id=str(tenant_id))
raise
async def _calculate_production_plan(
self,
tenant_id: UUID,
target_date: date,
demand_data: Optional[Dict[str, Any]],
stock_levels: Optional[Dict[str, Any]],
recipe_data: Optional[Dict[str, Any]],
available_capacity: Dict[str, Any]
) -> DailyProductionRequirements:
"""Apply production planning business logic"""
# Default production plan structure
production_plan = []
total_capacity_needed = 0.0
urgent_items = 0
if demand_data and "demand_items" in demand_data:
for item in demand_data["demand_items"]:
product_id = item.get("product_id")
demand_quantity = item.get("quantity", 0)
current_stock = 0
# Find current stock for this product
if stock_levels and "stock_levels" in stock_levels:
for stock in stock_levels["stock_levels"]:
if stock.get("product_id") == product_id:
current_stock = stock.get("available_quantity", 0)
break
# Calculate production need
production_needed = max(0, demand_quantity - current_stock)
if production_needed > 0:
# Determine urgency
urgency = "high" if demand_quantity > current_stock * 2 else "medium"
if urgency == "high":
urgent_items += 1
# Estimate capacity needed (simplified)
estimated_time_hours = production_needed * 0.5 # 30 minutes per unit
total_capacity_needed += estimated_time_hours
production_plan.append({
"product_id": product_id,
"product_name": item.get("product_name", f"Product {product_id}"),
"current_inventory": current_stock,
"demand_forecast": demand_quantity,
"pre_orders": item.get("pre_orders", 0),
"recommended_production": production_needed,
"urgency": urgency
})
return DailyProductionRequirements(
date=target_date,
production_plan=production_plan,
total_capacity_needed=total_capacity_needed,
available_capacity=available_capacity.get("total_hours", 8.0),
capacity_gap=max(0, total_capacity_needed - available_capacity.get("total_hours", 8.0)),
urgent_items=urgent_items,
recommended_schedule=None
)
async def _calculate_available_capacity(
self,
capacity_repo: ProductionCapacityRepository,
tenant_id: UUID,
target_date: date
) -> Dict[str, Any]:
"""Calculate available production capacity for a date"""
try:
# Get capacity entries for the date
equipment_capacity = await capacity_repo.get_available_capacity(
str(tenant_id), "equipment", target_date, 0
)
staff_capacity = await capacity_repo.get_available_capacity(
str(tenant_id), "staff", target_date, 0
)
# Calculate total available hours (simplified)
total_equipment_hours = sum(c.remaining_capacity_units for c in equipment_capacity)
total_staff_hours = sum(c.remaining_capacity_units for c in staff_capacity)
# Capacity is limited by the minimum of equipment or staff
effective_hours = min(total_equipment_hours, total_staff_hours) if total_staff_hours > 0 else total_equipment_hours
return {
"total_hours": effective_hours,
"equipment_hours": total_equipment_hours,
"staff_hours": total_staff_hours,
"utilization_percentage": 0 # To be calculated
}
except Exception as e:
logger.error("Error calculating available capacity", error=str(e))
# Return default capacity if calculation fails
return {
"total_hours": 8.0,
"equipment_hours": 8.0,
"staff_hours": 8.0,
"utilization_percentage": 0
}
async def _update_inventory_on_completion(
self,
tenant_id: UUID,
batch: ProductionBatch,
actual_quantity: float
):
"""Update inventory when a batch is completed"""
try:
# Add the produced quantity to inventory
update_result = await self.inventory_client.update_stock_level(
str(tenant_id),
str(batch.product_id),
actual_quantity,
f"Production batch {batch.batch_number} completed"
)
logger.info("Updated inventory after production completion",
batch_id=str(batch.id),
product_id=str(batch.product_id),
quantity_added=actual_quantity,
update_result=update_result)
except Exception as e:
logger.error("Error updating inventory on batch completion",
error=str(e), batch_id=str(batch.id))
# Don't raise - inventory update failure shouldn't prevent batch completion
# Additional Batch Methods
async def update_production_batch(
self,
tenant_id: UUID,
batch_id: UUID,
batch_update: ProductionBatchUpdate
) -> ProductionBatch:
"""Update production batch"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
batch = await batch_repo.update_batch(batch_id, batch_update.model_dump(exclude_none=True))
logger.info("Updated production batch",
batch_id=str(batch_id), tenant_id=str(tenant_id))
return batch
except Exception as e:
logger.error("Error updating production batch",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
raise
async def delete_production_batch(self, tenant_id: UUID, batch_id: UUID):
"""Delete/cancel production batch"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Check if batch can be deleted
batch = await batch_repo.get(batch_id)
if batch.status in [ProductionStatus.IN_PROGRESS, ProductionStatus.COMPLETED]:
raise ValueError("Cannot delete batch that is in progress or completed")
await batch_repo.delete_batch(batch_id)
logger.info("Deleted production batch",
batch_id=str(batch_id), tenant_id=str(tenant_id))
except Exception as e:
logger.error("Error deleting production batch",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
raise
async def start_production_batch(self, tenant_id: UUID, batch_id: UUID) -> ProductionBatch:
"""Start production batch"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
batch = await batch_repo.start_batch(batch_id)
logger.info("Started production batch",
batch_id=str(batch_id), tenant_id=str(tenant_id))
# Emit batch started notification
if self.notification_service:
try:
await self.notification_service.emit_batch_started_notification(
tenant_id=tenant_id,
batch_id=str(batch.id),
product_sku=batch.product_sku or "",
product_name=batch.product_name or "Unknown Product",
quantity_planned=batch.planned_quantity or 0,
unit=batch.unit or "units",
estimated_duration_minutes=batch.planned_duration_minutes,
assigned_to=batch.assigned_to
)
except Exception as notif_error:
logger.warning("Failed to emit batch started notification",
error=str(notif_error), batch_id=str(batch_id))
# Acknowledge production delay alerts (non-blocking)
try:
from shared.clients.alert_processor_client import get_alert_processor_client
alert_client = get_alert_processor_client(self.config, "production")
await alert_client.acknowledge_alerts_by_metadata(
tenant_id=tenant_id,
alert_type="production_delay",
metadata_filter={"batch_id": str(batch_id)}
)
await alert_client.acknowledge_alerts_by_metadata(
tenant_id=tenant_id,
alert_type="batch_at_risk",
metadata_filter={"batch_id": str(batch_id)}
)
logger.debug("Acknowledged production delay alerts", batch_id=str(batch_id))
except Exception as e:
# Log but don't fail the batch start
logger.warning("Failed to acknowledge production alerts", batch_id=str(batch_id), error=str(e))
return batch
except Exception as e:
logger.error("Error starting production batch",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
raise
async def complete_production_batch(
self,
tenant_id: UUID,
batch_id: UUID,
completion_data: Optional[Dict[str, Any]] = None
) -> ProductionBatch:
"""Complete production batch"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
batch = await batch_repo.complete_batch(batch_id, completion_data or {})
# Update inventory if actual quantity is available
if batch.actual_quantity:
await self._update_inventory_on_completion(tenant_id, batch, batch.actual_quantity)
logger.info("Completed production batch",
batch_id=str(batch_id), tenant_id=str(tenant_id))
# Emit batch completed notification
if self.notification_service:
try:
# Calculate production duration if start and end times are available
production_duration_minutes = None
if batch.actual_start_time and batch.actual_end_time:
duration = batch.actual_end_time - batch.actual_start_time
production_duration_minutes = int(duration.total_seconds() / 60)
await self.notification_service.emit_batch_completed_notification(
tenant_id=tenant_id,
batch_id=str(batch.id),
product_sku=batch.product_sku or "",
product_name=batch.product_name or "Unknown Product",
quantity_produced=batch.actual_quantity or batch.planned_quantity or 0,
unit=batch.unit or "units",
production_duration_minutes=production_duration_minutes,
quality_score=batch.quality_score
)
except Exception as notif_error:
logger.warning("Failed to emit batch completed notification",
error=str(notif_error), batch_id=str(batch_id))
return batch
except Exception as e:
logger.error("Error completing production batch",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
raise
async def complete_production_batch_with_transformation(
self,
tenant_id: UUID,
batch_id: UUID,
completion_data: Optional[Dict[str, Any]] = None,
transformation_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Complete production batch and apply transformation if needed"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Complete the batch first
batch = await batch_repo.complete_batch(batch_id, completion_data or {})
# Update inventory for the completed batch
if batch.actual_quantity:
await self._update_inventory_on_completion(tenant_id, batch, batch.actual_quantity)
result = {
"batch": batch.to_dict(),
"transformation": None
}
# Apply transformation if requested and batch produces par-baked goods
if transformation_data and batch.actual_quantity:
transformation_result = await self._apply_batch_transformation(
tenant_id, batch, transformation_data
)
result["transformation"] = transformation_result
logger.info("Completed production batch with transformation",
batch_id=str(batch_id),
has_transformation=bool(transformation_data),
tenant_id=str(tenant_id))
return result
except Exception as e:
logger.error("Error completing production batch with transformation",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
raise
async def transform_par_baked_products(
self,
tenant_id: UUID,
source_ingredient_id: UUID,
target_ingredient_id: UUID,
quantity: float,
batch_reference: Optional[str] = None,
expiration_hours: int = 24,
notes: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""Transform par-baked products to finished products"""
try:
# Use the inventory client to create the transformation
transformation_result = await self.inventory_client.create_par_bake_transformation(
source_ingredient_id=source_ingredient_id,
target_ingredient_id=target_ingredient_id,
quantity=quantity,
tenant_id=str(tenant_id),
target_batch_number=batch_reference,
expiration_hours=expiration_hours,
notes=notes
)
if transformation_result:
logger.info("Created par-baked transformation",
transformation_id=transformation_result.get('transformation_id'),
source_ingredient=str(source_ingredient_id),
target_ingredient=str(target_ingredient_id),
quantity=quantity,
tenant_id=str(tenant_id))
return transformation_result
except Exception as e:
logger.error("Error transforming par-baked products",
error=str(e),
source_ingredient=str(source_ingredient_id),
target_ingredient=str(target_ingredient_id),
tenant_id=str(tenant_id))
raise
async def _apply_batch_transformation(
self,
tenant_id: UUID,
batch: ProductionBatch,
transformation_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Apply transformation after batch completion"""
try:
# Extract transformation parameters
source_ingredient_id = transformation_data.get('source_ingredient_id')
target_ingredient_id = transformation_data.get('target_ingredient_id')
transform_quantity = transformation_data.get('quantity', batch.actual_quantity)
expiration_hours = transformation_data.get('expiration_hours', 24)
notes = transformation_data.get('notes', f"Transformation from batch {batch.batch_number}")
if not source_ingredient_id or not target_ingredient_id:
logger.warning("Missing ingredient IDs for transformation",
batch_id=str(batch.id), transformation_data=transformation_data)
return None
# Create the transformation
transformation_result = await self.transform_par_baked_products(
tenant_id=tenant_id,
source_ingredient_id=UUID(source_ingredient_id),
target_ingredient_id=UUID(target_ingredient_id),
quantity=transform_quantity,
batch_reference=batch.batch_number,
expiration_hours=expiration_hours,
notes=notes
)
return transformation_result
except Exception as e:
logger.error("Error applying batch transformation",
error=str(e), batch_id=str(batch.id), tenant_id=str(tenant_id))
return None
async def get_batch_statistics(
self,
tenant_id: UUID,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get batch statistics"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
stats = await batch_repo.get_batch_statistics(str(tenant_id), start_date, end_date)
return stats
except Exception as e:
logger.error("Error getting batch statistics",
error=str(e), tenant_id=str(tenant_id))
raise
# Production Schedule Methods
async def create_production_schedule(
self,
tenant_id: UUID,
schedule_data: ProductionScheduleCreate
) -> ProductionSchedule:
"""Create production schedule"""
try:
async with self.database_manager.get_session() as session:
schedule_repo = ProductionScheduleRepository(session)
schedule_dict = schedule_data.model_dump()
schedule_dict["tenant_id"] = tenant_id
schedule = await schedule_repo.create_schedule(schedule_dict)
logger.info("Created production schedule",
schedule_id=str(schedule.id), tenant_id=str(tenant_id))
return schedule
except Exception as e:
logger.error("Error creating production schedule",
error=str(e), tenant_id=str(tenant_id))
raise
async def update_production_schedule(
self,
tenant_id: UUID,
schedule_id: UUID,
schedule_update: ProductionScheduleUpdate
) -> ProductionSchedule:
"""Update production schedule"""
try:
async with self.database_manager.get_session() as session:
schedule_repo = ProductionScheduleRepository(session)
schedule = await schedule_repo.update_schedule(
schedule_id,
schedule_update.model_dump(exclude_none=True)
)
logger.info("Updated production schedule",
schedule_id=str(schedule_id), tenant_id=str(tenant_id))
return schedule
except Exception as e:
logger.error("Error updating production schedule",
error=str(e), schedule_id=str(schedule_id), tenant_id=str(tenant_id))
raise
async def finalize_production_schedule(
self,
tenant_id: UUID,
schedule_id: UUID
) -> ProductionSchedule:
"""Finalize production schedule"""
try:
async with self.database_manager.get_session() as session:
schedule_repo = ProductionScheduleRepository(session)
schedule = await schedule_repo.finalize_schedule(schedule_id)
logger.info("Finalized production schedule",
schedule_id=str(schedule_id), tenant_id=str(tenant_id))
return schedule
except Exception as e:
logger.error("Error finalizing production schedule",
error=str(e), schedule_id=str(schedule_id), tenant_id=str(tenant_id))
raise
async def optimize_schedule(self, tenant_id: UUID, target_date: date) -> Dict[str, Any]:
"""Optimize schedule using AI"""
try:
# Mock AI optimization for now
return {
"optimized": True,
"suggestions": [
{
"type": "reschedule",
"message": "Move croissant production to 6 AM to avoid oven congestion",
"impact": "Reduces wait time by 30 minutes"
}
],
"predicted_efficiency": 92.5
}
except Exception as e:
logger.error("Error optimizing schedule",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_capacity_usage_report(
self,
tenant_id: UUID,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get capacity usage report"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
usage_data = await capacity_repo.get_capacity_usage_report(
str(tenant_id), start_date, end_date
)
return usage_data
except Exception as e:
logger.error("Error getting capacity usage report",
error=str(e), tenant_id=str(tenant_id))
raise
# Capacity Methods
async def get_capacity_by_date(
self,
tenant_id: UUID,
target_date: date
) -> List[Dict[str, Any]]:
"""Get capacity entries for a specific date"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
capacity_list = await capacity_repo.get_capacity_by_date(
str(tenant_id), target_date
)
# Convert to dictionaries for API response
return [capacity.to_dict() for capacity in capacity_list]
except Exception as e:
logger.error("Error getting capacity by date",
error=str(e), tenant_id=str(tenant_id), date=target_date.isoformat())
raise
async def get_capacity_list(
self,
tenant_id: UUID,
filters: Dict[str, Any],
page: int,
page_size: int
) -> Dict[str, Any]:
"""Get capacity list with filters"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
capacity_list = await capacity_repo.get_capacity_list(
str(tenant_id), filters, page, page_size
)
return capacity_list
except Exception as e:
logger.error("Error getting capacity list",
error=str(e), tenant_id=str(tenant_id))
raise
async def check_resource_availability(
self,
tenant_id: UUID,
resource_id: str,
start_time: datetime,
end_time: datetime
) -> Dict[str, Any]:
"""Check resource availability"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
availability = await capacity_repo.check_resource_availability(
str(tenant_id), resource_id, start_time, end_time
)
return availability
except Exception as e:
logger.error("Error checking resource availability",
error=str(e), tenant_id=str(tenant_id))
raise
async def reserve_capacity(
self,
tenant_id: UUID,
reservation_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Reserve capacity"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
reservation_data["tenant_id"] = str(tenant_id)
reservation = await capacity_repo.reserve_capacity(reservation_data)
return reservation
except Exception as e:
logger.error("Error reserving capacity",
error=str(e), tenant_id=str(tenant_id))
raise
async def update_capacity(
self,
tenant_id: UUID,
capacity_id: UUID,
update_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Update capacity"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
capacity = await capacity_repo.update_capacity(capacity_id, update_data)
return capacity
except Exception as e:
logger.error("Error updating capacity",
error=str(e), tenant_id=str(tenant_id))
raise
async def predict_capacity_bottlenecks(
self,
tenant_id: UUID,
days_ahead: int
) -> Dict[str, Any]:
"""Predict capacity bottlenecks"""
try:
# Mock AI prediction for now
return {
"bottlenecks": [
{
"date": (date.today() + timedelta(days=1)).isoformat(),
"time_slot": "06:00-07:00",
"resource_name": "Oven #3",
"predicted_utilization": 95.0,
"severity": "high",
"suggestion": "Consider scheduling lighter load items during this period"
}
]
}
except Exception as e:
logger.error("Error predicting capacity bottlenecks",
error=str(e), tenant_id=str(tenant_id))
raise
# Quality Methods
async def get_quality_checks_list(
self,
tenant_id: UUID,
filters: Dict[str, Any],
page: int,
page_size: int
) -> Dict[str, Any]:
"""Get quality checks list"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
quality_checks = await quality_repo.get_quality_checks_list(
str(tenant_id), filters, page, page_size
)
return quality_checks
except Exception as e:
logger.error("Error getting quality checks list",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_batch_quality_checks(
self,
tenant_id: UUID,
batch_id: UUID
) -> Dict[str, Any]:
"""Get quality checks for a specific batch"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
checks = await quality_repo.get_checks_by_batch(str(tenant_id), str(batch_id))
return {"quality_checks": [check.to_dict() for check in checks]}
except Exception as e:
logger.error("Error getting batch quality checks",
error=str(e), tenant_id=str(tenant_id))
raise
async def create_quality_check(
self,
tenant_id: UUID,
quality_check_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Create quality check"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
quality_check_data["tenant_id"] = str(tenant_id)
check = await quality_repo.create_quality_check(quality_check_data)
return check.to_dict()
except Exception as e:
logger.error("Error creating quality check",
error=str(e), tenant_id=str(tenant_id))
raise
async def update_quality_check(
self,
tenant_id: UUID,
check_id: UUID,
update_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Update quality check"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
check = await quality_repo.update_quality_check(check_id, update_data)
return check.to_dict()
except Exception as e:
logger.error("Error updating quality check",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_quality_trends(
self,
tenant_id: UUID,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get quality trends"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
trends = await quality_repo.get_quality_trends(str(tenant_id), start_date, end_date)
return trends
except Exception as e:
logger.error("Error getting quality trends",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_quality_alerts(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get quality alerts"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
alerts = await quality_repo.get_quality_alerts(str(tenant_id))
return alerts
except Exception as e:
logger.error("Error getting quality alerts",
error=str(e), tenant_id=str(tenant_id))
raise
# Analytics Methods
async def get_performance_analytics(
self,
tenant_id: UUID,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get performance analytics"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
analytics = await batch_repo.get_performance_analytics(
str(tenant_id), start_date, end_date
)
return analytics
except Exception as e:
logger.error("Error getting performance analytics",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_yield_trends_analytics(
self,
tenant_id: UUID,
period: str
) -> Dict[str, Any]:
"""Get yield trends analytics"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
trends = await batch_repo.get_yield_trends(str(tenant_id), period)
return trends
except Exception as e:
logger.error("Error getting yield trends analytics",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_top_defects_analytics(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get top defects analytics"""
try:
async with self.database_manager.get_session() as session:
quality_repo = QualityCheckRepository(session)
defects = await quality_repo.get_top_defects(str(tenant_id))
return defects
except Exception as e:
logger.error("Error getting top defects analytics",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_equipment_efficiency_analytics(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get equipment efficiency analytics"""
try:
async with self.database_manager.get_session() as session:
capacity_repo = ProductionCapacityRepository(session)
efficiency = await capacity_repo.get_equipment_efficiency(str(tenant_id))
return efficiency
except Exception as e:
logger.error("Error getting equipment efficiency analytics",
error=str(e), tenant_id=str(tenant_id))
raise
async def generate_analytics_report(
self,
tenant_id: UUID,
report_config: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate analytics report"""
try:
# Mock report generation for now
return {
"report_id": f"report_{tenant_id}_{date.today().isoformat()}",
"generated_at": datetime.now().isoformat(),
"config": report_config,
"download_url": f"/reports/production_{tenant_id}_{date.today().isoformat()}.pdf"
}
except Exception as e:
logger.error("Error generating analytics report",
error=str(e), tenant_id=str(tenant_id))
raise
# ================================================================
# TRANSFORMATION METHODS FOR PRODUCTION
# ================================================================
async def get_production_transformations(
self,
tenant_id: UUID,
days_back: int = 30,
limit: int = 100
) -> List[Dict[str, Any]]:
"""Get transformations related to production processes"""
try:
transformations = await self.inventory_client.get_transformations(
tenant_id=str(tenant_id),
source_stage="PAR_BAKED",
target_stage="FULLY_BAKED",
days_back=days_back,
limit=limit
)
logger.info("Retrieved production transformations",
count=len(transformations), tenant_id=str(tenant_id))
return transformations
except Exception as e:
logger.error("Error getting production transformations",
error=str(e), tenant_id=str(tenant_id))
return []
async def get_transformation_efficiency_metrics(
self,
tenant_id: UUID,
days_back: int = 30
) -> Dict[str, Any]:
"""Get transformation efficiency metrics for production analytics"""
try:
# Get transformation summary from inventory service
summary = await self.inventory_client.get_transformation_summary(
tenant_id=str(tenant_id),
days_back=days_back
)
if not summary:
return {
"par_baked_to_fully_baked": {
"total_transformations": 0,
"total_quantity_transformed": 0.0,
"average_conversion_ratio": 0.0,
"efficiency_percentage": 0.0
},
"period_days": days_back,
"transformation_rate": 0.0
}
# Extract par-baked to fully baked metrics
par_baked_metrics = summary.get("par_baked_to_fully_baked", {})
total_transformations = summary.get("total_transformations", 0)
# Calculate transformation rate (transformations per day)
transformation_rate = total_transformations / max(days_back, 1)
result = {
"par_baked_to_fully_baked": {
"total_transformations": par_baked_metrics.get("count", 0),
"total_quantity_transformed": par_baked_metrics.get("total_source_quantity", 0.0),
"average_conversion_ratio": par_baked_metrics.get("average_conversion_ratio", 0.0),
"efficiency_percentage": par_baked_metrics.get("average_conversion_ratio", 0.0) * 100
},
"period_days": days_back,
"transformation_rate": round(transformation_rate, 2),
"total_transformations": total_transformations
}
logger.info("Retrieved transformation efficiency metrics",
total_transformations=total_transformations,
transformation_rate=transformation_rate,
tenant_id=str(tenant_id))
return result
except Exception as e:
logger.error("Error getting transformation efficiency metrics",
error=str(e), tenant_id=str(tenant_id))
return {
"par_baked_to_fully_baked": {
"total_transformations": 0,
"total_quantity_transformed": 0.0,
"average_conversion_ratio": 0.0,
"efficiency_percentage": 0.0
},
"period_days": days_back,
"transformation_rate": 0.0,
"total_transformations": 0
}
async def get_batch_with_transformations(
self,
tenant_id: UUID,
batch_id: UUID
) -> Dict[str, Any]:
"""Get batch details with associated transformations"""
try:
async with self.database_manager.get_session() as session:
batch_repo = ProductionBatchRepository(session)
# Get batch details
batch = await batch_repo.get(batch_id)
if not batch or str(batch.tenant_id) != str(tenant_id):
return {}
batch_data = batch.to_dict()
# Get related transformations from inventory service
# Look for transformations that reference this batch
transformations = await self.inventory_client.get_transformations(
tenant_id=str(tenant_id),
days_back=7, # Look in recent transformations
limit=50
)
# Filter transformations related to this batch
batch_transformations = []
batch_number = batch.batch_number
for transformation in transformations:
# Check if transformation references this batch
if (transformation.get('target_batch_number') == batch_number or
transformation.get('process_notes', '').find(batch_number) >= 0):
batch_transformations.append(transformation)
result = {
"batch": batch_data,
"transformations": batch_transformations,
"transformation_count": len(batch_transformations)
}
logger.info("Retrieved batch with transformations",
batch_id=str(batch_id),
transformation_count=len(batch_transformations),
tenant_id=str(tenant_id))
return result
except Exception as e:
logger.error("Error getting batch with transformations",
error=str(e), batch_id=str(batch_id), tenant_id=str(tenant_id))
return {}
# ================================================================
# EQUIPMENT MANAGEMENT METHODS
# ================================================================
async def get_equipment_list(
self,
tenant_id: UUID,
filters: Dict[str, Any],
page: int = 1,
page_size: int = 50
) -> Dict[str, Any]:
"""Get list of equipment with filtering and pagination"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
# Apply filters
filter_dict = {k: v for k, v in filters.items() if v is not None}
filter_dict["tenant_id"] = str(tenant_id)
# Get equipment with pagination
equipment_list = await equipment_repo.get_equipment_filtered(filter_dict, page, page_size)
total_count = await equipment_repo.count_equipment_filtered(filter_dict)
# Convert to response format
from app.schemas.equipment import EquipmentResponse
equipment_responses = [
EquipmentResponse.model_validate(eq) for eq in equipment_list
]
return {
"equipment": equipment_responses,
"total_count": total_count,
"page": page,
"page_size": page_size
}
except Exception as e:
logger.error("Error getting equipment list",
error=str(e), tenant_id=str(tenant_id))
raise
async def get_equipment(self, tenant_id: UUID, equipment_id: UUID):
"""Get a specific equipment item"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
equipment = await equipment_repo.get_equipment_by_id(tenant_id, equipment_id)
if not equipment:
return None
logger.info("Retrieved equipment",
equipment_id=str(equipment_id), tenant_id=str(tenant_id))
return equipment
except Exception as e:
logger.error("Error getting equipment",
error=str(e), equipment_id=str(equipment_id), tenant_id=str(tenant_id))
raise
async def create_equipment(self, tenant_id: UUID, equipment_data):
"""Create a new equipment item"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
# Prepare equipment data
equipment_dict = equipment_data.model_dump()
equipment_dict["tenant_id"] = tenant_id
# Create equipment
equipment = await equipment_repo.create_equipment(equipment_dict)
# Commit the transaction to persist changes
await session.commit()
logger.info("Created equipment",
equipment_id=str(equipment.id), tenant_id=str(tenant_id))
return equipment
except Exception as e:
logger.error("Error creating equipment",
error=str(e), tenant_id=str(tenant_id))
raise
async def update_equipment(self, tenant_id: UUID, equipment_id: UUID, equipment_update):
"""Update an equipment item"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
# First verify equipment belongs to tenant and capture old status
equipment = await equipment_repo.get_equipment_by_id(tenant_id, equipment_id)
if not equipment:
return None
old_status = equipment.status if hasattr(equipment, 'status') else None
# Update equipment
updated_equipment = await equipment_repo.update_equipment(
equipment_id,
equipment_update.model_dump(exclude_none=True)
)
# Commit the transaction to persist changes
await session.commit()
logger.info("Updated equipment",
equipment_id=str(equipment_id), tenant_id=str(tenant_id))
# Emit equipment status notification if status changed
update_dict = equipment_update.model_dump(exclude_none=True)
new_status = update_dict.get('status')
if self.notification_service and new_status and old_status and new_status != old_status:
try:
await self.notification_service.emit_equipment_status_notification(
tenant_id=tenant_id,
equipment_id=str(equipment_id),
equipment_name=updated_equipment.name or "Unknown Equipment",
old_status=old_status,
new_status=new_status,
reason=update_dict.get('notes') or update_dict.get('status_reason')
)
except Exception as notif_error:
logger.warning("Failed to emit equipment status notification",
error=str(notif_error), equipment_id=str(equipment_id))
return updated_equipment
except Exception as e:
logger.error("Error updating equipment",
error=str(e), equipment_id=str(equipment_id), tenant_id=str(tenant_id))
raise
async def delete_equipment(self, tenant_id: UUID, equipment_id: UUID) -> bool:
"""Delete (soft delete) an equipment item"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
# First verify equipment belongs to tenant
equipment = await equipment_repo.get_equipment_by_id(tenant_id, equipment_id)
if not equipment:
return False
# Soft delete equipment
success = await equipment_repo.delete_equipment(equipment_id)
# Commit the transaction to persist changes
await session.commit()
logger.info("Deleted equipment",
equipment_id=str(equipment_id), tenant_id=str(tenant_id))
return success
except Exception as e:
logger.error("Error deleting equipment",
error=str(e), equipment_id=str(equipment_id), tenant_id=str(tenant_id))
raise
async def hard_delete_equipment(self, tenant_id: UUID, equipment_id: UUID) -> bool:
"""Permanently delete an equipment item from database"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
# First verify equipment belongs to tenant
equipment = await equipment_repo.get_equipment_by_id(tenant_id, equipment_id)
if not equipment:
return False
# Get deletion summary first for logging
summary = await equipment_repo.get_equipment_deletion_summary(tenant_id, equipment_id)
# Hard delete equipment
success = await equipment_repo.hard_delete_equipment(equipment_id)
# Commit the transaction to persist changes
await session.commit()
logger.info("Hard deleted equipment",
equipment_id=str(equipment_id),
tenant_id=str(tenant_id),
affected_batches=summary.get("production_batches_count", 0))
return success
except Exception as e:
logger.error("Error hard deleting equipment",
error=str(e), equipment_id=str(equipment_id), tenant_id=str(tenant_id))
raise
async def get_equipment_deletion_summary(self, tenant_id: UUID, equipment_id: UUID) -> Dict[str, Any]:
"""Get deletion summary for an equipment item"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.equipment_repository import EquipmentRepository
equipment_repo = EquipmentRepository(session)
summary = await equipment_repo.get_equipment_deletion_summary(tenant_id, equipment_id)
logger.info("Retrieved equipment deletion summary",
equipment_id=str(equipment_id),
tenant_id=str(tenant_id),
can_delete=summary.get("can_delete", False))
return summary
except Exception as e:
logger.error("Error getting equipment deletion summary",
error=str(e), equipment_id=str(equipment_id), tenant_id=str(tenant_id))
raise
# ================================================================
# SUSTAINABILITY / WASTE ANALYTICS
# ================================================================
async def get_waste_analytics(
self,
tenant_id: UUID,
start_date: datetime,
end_date: datetime
) -> Dict[str, Any]:
"""
Get production waste analytics for sustainability tracking
Called by Inventory Service's sustainability module
to calculate environmental impact and SDG 12.3 compliance.
"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.production_batch_repository import ProductionBatchRepository
# Use repository for waste analytics
batch_repo = ProductionBatchRepository(session)
waste_data = await batch_repo.get_waste_analytics(
tenant_id=tenant_id,
start_date=start_date,
end_date=end_date
)
return waste_data
except Exception as e:
logger.error(
"Error calculating waste analytics",
tenant_id=str(tenant_id),
error=str(e)
)
raise
async def get_baseline_metrics(self, tenant_id: UUID) -> Dict[str, Any]:
"""
Get baseline production metrics from first 90 days
Used by sustainability service to establish waste baseline
for SDG 12.3 compliance tracking.
"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.production_batch_repository import ProductionBatchRepository
# Use repository for baseline metrics
batch_repo = ProductionBatchRepository(session)
baseline_raw = await batch_repo.get_baseline_metrics(tenant_id)
# Transform repository data to match expected format
if baseline_raw['has_baseline']:
baseline_data = {
'waste_percentage': baseline_raw['waste_percentage'],
'total_production_kg': baseline_raw['total_production'],
'total_waste_kg': baseline_raw['total_waste'],
'period': {
'start_date': baseline_raw['baseline_start'].isoformat() if baseline_raw['baseline_start'] else None,
'end_date': baseline_raw['baseline_end'].isoformat() if baseline_raw['baseline_end'] else None,
'type': 'first_90_days'
},
'data_available': True
}
else:
# Not enough data yet - return indicator
baseline_data = {
'waste_percentage': 25.0, # EU bakery industry average
'total_production_kg': 0,
'total_waste_kg': 0,
'period': {
'type': 'industry_average',
'note': 'Using EU bakery industry average of 25% as baseline'
},
'data_available': False
}
logger.info(
"Baseline metrics retrieved",
tenant_id=str(tenant_id),
waste_percentage=baseline_data['waste_percentage'],
data_available=baseline_data['data_available']
)
return baseline_data
except Exception as e:
logger.error(
"Error getting baseline metrics",
tenant_id=str(tenant_id),
error=str(e)
)
raise
async def get_ai_waste_impact(
self,
tenant_id: UUID,
start_date: datetime,
end_date: datetime
) -> Dict[str, Any]:
"""
Get AI impact on waste reduction
Compares waste rates between AI-assisted and manual batches
to demonstrate ROI of AI features for sustainability.
"""
try:
async with self.database_manager.get_session() as session:
from app.repositories.production_batch_repository import ProductionBatchRepository
from sqlalchemy import text
batch_repo = ProductionBatchRepository(session)
# Query for AI vs manual batch comparison
query = text("""
SELECT
-- AI-assisted batches
COUNT(CASE WHEN is_ai_assisted = true THEN 1 END) as ai_batches,
COALESCE(SUM(CASE WHEN is_ai_assisted = true THEN planned_quantity ELSE 0 END), 0) as ai_planned,
COALESCE(SUM(CASE WHEN is_ai_assisted = true THEN actual_quantity ELSE 0 END), 0) as ai_actual,
COALESCE(SUM(CASE WHEN is_ai_assisted = true THEN waste_quantity ELSE 0 END), 0) as ai_waste,
COALESCE(SUM(CASE WHEN is_ai_assisted = true THEN defect_quantity ELSE 0 END), 0) as ai_defects,
-- Manual batches
COUNT(CASE WHEN is_ai_assisted = false THEN 1 END) as manual_batches,
COALESCE(SUM(CASE WHEN is_ai_assisted = false THEN planned_quantity ELSE 0 END), 0) as manual_planned,
COALESCE(SUM(CASE WHEN is_ai_assisted = false THEN actual_quantity ELSE 0 END), 0) as manual_actual,
COALESCE(SUM(CASE WHEN is_ai_assisted = false THEN waste_quantity ELSE 0 END), 0) as manual_waste,
COALESCE(SUM(CASE WHEN is_ai_assisted = false THEN defect_quantity ELSE 0 END), 0) as manual_defects
FROM production_batches
WHERE tenant_id = :tenant_id
AND created_at BETWEEN :start_date AND :end_date
AND status IN ('COMPLETED', 'QUALITY_CHECK')
""")
result = await session.execute(
query,
{
'tenant_id': tenant_id,
'start_date': start_date,
'end_date': end_date
}
)
row = result.fetchone()
# Calculate waste percentages
ai_total_waste = float(row.ai_waste or 0) + float(row.ai_defects or 0)
manual_total_waste = float(row.manual_waste or 0) + float(row.manual_defects or 0)
ai_waste_pct = (ai_total_waste / float(row.ai_planned)) * 100 if row.ai_planned > 0 else 0
manual_waste_pct = (manual_total_waste / float(row.manual_planned)) * 100 if row.manual_planned > 0 else 0
# Calculate reduction
waste_reduction_pct = 0
if manual_waste_pct > 0:
waste_reduction_pct = ((manual_waste_pct - ai_waste_pct) / manual_waste_pct) * 100
# Calculate waste avoided
if manual_waste_pct > 0 and row.ai_planned > 0:
waste_avoided_kg = (float(row.ai_planned) * (manual_waste_pct / 100)) - ai_total_waste
else:
waste_avoided_kg = 0
# Financial impact (€3.50/kg average waste cost)
waste_cost_avoided = waste_avoided_kg * 3.50
ai_impact_data = {
'ai_batches': {
'count': int(row.ai_batches or 0),
'production_kg': float(row.ai_planned or 0),
'waste_kg': ai_total_waste,
'waste_percentage': round(ai_waste_pct, 2)
},
'manual_batches': {
'count': int(row.manual_batches or 0),
'production_kg': float(row.manual_planned or 0),
'waste_kg': manual_total_waste,
'waste_percentage': round(manual_waste_pct, 2)
},
'impact': {
'waste_reduction_percentage': round(waste_reduction_pct, 1),
'waste_avoided_kg': round(waste_avoided_kg, 2),
'cost_savings_eur': round(waste_cost_avoided, 2),
'annual_projection_eur': round(waste_cost_avoided * 12, 2)
},
'adoption': {
'ai_adoption_rate': round((int(row.ai_batches or 0) / (int(row.ai_batches or 0) + int(row.manual_batches or 1))) * 100, 1),
'recommendation': 'increase_ai_usage' if waste_reduction_pct > 10 else 'monitor'
},
'period': {
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat()
}
}
logger.info(
"AI waste impact calculated",
tenant_id=str(tenant_id),
waste_reduction_pct=waste_reduction_pct,
waste_avoided_kg=waste_avoided_kg
)
return ai_impact_data
except Exception as e:
logger.error(
"Error calculating AI waste impact",
tenant_id=str(tenant_id),
error=str(e)
)
raise
# ================================================================
# NEW: ORCHESTRATOR INTEGRATION
# ================================================================
async def generate_production_schedule_from_forecast(
self,
tenant_id: UUID,
target_date: date,
forecasts: List[Dict[str, Any]],
planning_horizon_days: int = 1
) -> Dict[str, Any]:
"""
Generate production schedule from forecast data (called by Orchestrator)
This method receives forecast data from the Orchestrator and generates
a production schedule with production batches.
Args:
tenant_id: Tenant UUID
target_date: Target production date
forecasts: List of forecast data with product_id and predicted_demand
planning_horizon_days: Planning horizon (1-7 days)
Returns:
Dict with schedule_id, schedule_number, batches_created, etc.
"""
try:
logger.info("Generating production schedule from forecast",
tenant_id=str(tenant_id),
target_date=target_date,
forecasts_count=len(forecasts))
async with self.database_manager.get_session() as session:
schedule_repo = ProductionScheduleRepository(session)
batch_repo = ProductionBatchRepository(session)
# Generate schedule number
schedule_number = await schedule_repo.generate_schedule_number(tenant_id, target_date)
# Calculate production end date
production_end_date = target_date + timedelta(days=planning_horizon_days - 1)
# Create production schedule
schedule_data = {
'tenant_id': tenant_id,
'schedule_number': schedule_number,
'schedule_date': target_date,
'production_start_date': target_date,
'production_end_date': production_end_date,
'status': 'draft',
'total_batches': 0,
'completed_batches': 0,
'created_at': datetime.now(timezone.utc),
'updated_at': datetime.now(timezone.utc),
}
schedule = await schedule_repo.create_schedule(schedule_data)
# Create production batches from forecasts
batches_created = 0
total_planned_quantity = 0.0
warnings = []
for forecast in forecasts:
try:
product_id = UUID(forecast['product_id'])
predicted_demand = float(forecast['predicted_demand'])
# Get current stock level from inventory
stock_info = await self.inventory_client.get_stock_level(
str(tenant_id), str(product_id)
)
current_stock = stock_info.get('current_stock', 0) if stock_info else 0
# Calculate production quantity needed
# Production needed = Predicted demand - Current stock (if positive)
production_needed = max(0, predicted_demand - current_stock)
if production_needed <= 0:
logger.info("Skipping product - sufficient stock",
product_id=str(product_id),
current_stock=current_stock,
predicted_demand=predicted_demand)
warnings.append(f"Product {product_id}: sufficient stock, no production needed")
continue
# Get recipe for the product (if exists)
# Note: In a real scenario, we'd fetch recipe_id from product/inventory
# For now, we assume recipe_id = product_id or fetch from a mapping
# Generate reasoning data for JTBD dashboard
from shared.schemas.reasoning_types import (
create_production_batch_reasoning,
PredictionFactor,
PredictionFactorType
)
# Try to get product name from forecast, stock_info, or use placeholder
product_name = (
forecast.get('product_name') or
(stock_info.get('product_name') if stock_info else None) or
f"Product {product_id}"
)
# Calculate variance from historical average if available
historical_average = forecast.get('historical_average', predicted_demand * 0.8) # Default to 80% of predicted
variance_percent = ((predicted_demand - historical_average) / historical_average * 100) if historical_average > 0 else 0
# Create detailed factors for enhanced reasoning
factors = []
# Factor 1: Historical pattern (always present)
factors.append(
PredictionFactor(
factor=PredictionFactorType.HISTORICAL_PATTERN,
weight=0.40,
contribution=historical_average * 0.40,
description="Based on historical sales patterns",
historical_data={
"historical_average": historical_average,
"historical_period": "last_30_days"
},
confidence=0.90
)
)
# Factor 2: Weather impact (if weather data is available in forecast)
weather_impact = forecast.get('weather_impact')
if weather_impact:
weather_type = weather_impact.get('type', 'sunny')
weather_contribution = weather_impact.get('contribution', 0)
weather_weight = weather_impact.get('weight', 0.25)
# Map weather type to PredictionFactorType
weather_factor_map = {
'sunny': PredictionFactorType.WEATHER_SUNNY,
'rainy': PredictionFactorType.WEATHER_RAINY,
'cold': PredictionFactorType.WEATHER_COLD,
'hot': PredictionFactorType.WEATHER_HOT
}
weather_factor = weather_factor_map.get(weather_type, PredictionFactorType.WEATHER_SUNNY)
factors.append(
PredictionFactor(
factor=weather_factor,
weight=weather_weight,
contribution=weather_contribution,
description=f"Weather impact: {weather_type}",
weather_data={
"condition": weather_type,
"temperature": weather_impact.get('temperature', 22),
"impact_direction": weather_impact.get('impact_direction', 'positive')
},
confidence=weather_impact.get('confidence', 0.85)
)
)
# Factor 3: Weekend boost (if target date is weekend)
if target_date.weekday() >= 5: # Saturday (5) or Sunday (6)
weekend_contribution = predicted_demand * 0.20 # 20% boost
factors.append(
PredictionFactor(
factor=PredictionFactorType.WEEKEND_BOOST,
weight=0.20,
contribution=weekend_contribution,
description="Weekend demand increase",
confidence=0.80
)
)
# Factor 4: Inventory level consideration
inventory_weight = 0.15
inventory_contribution = current_stock * inventory_weight
factors.append(
PredictionFactor(
factor=PredictionFactorType.INVENTORY_LEVEL,
weight=inventory_weight,
contribution=inventory_contribution,
description="Current inventory consideration",
inventory_data={
"current_stock": current_stock,
"safety_stock_days": 3
},
confidence=0.95
)
)
# Use unified reasoning function - enhanced when factors exist, basic otherwise
if factors:
reasoning_data = create_production_batch_reasoning(
product_name=product_name,
predicted_demand=predicted_demand,
historical_average=historical_average,
variance_percent=variance_percent,
variance_reason="weather_sunny_weekend" if (target_date.weekday() >= 5 and weather_impact) else "historical_pattern",
confidence_score=forecast.get('confidence_score', 0.87),
factors=factors,
urgency_level="normal",
ready_by_time="08:00",
forecast_id=forecast.get('forecast_id')
)
else:
reasoning_data = create_production_batch_reasoning(
product_name=product_name,
predicted_demand=predicted_demand,
current_stock=current_stock,
production_needed=production_needed,
target_date=target_date.isoformat(),
confidence_score=forecast.get('confidence_score', 0.85)
)
# Create production batch
planned_start = datetime.combine(target_date, datetime.min.time())
planned_end = datetime.combine(target_date, datetime.max.time())
duration_minutes = int((planned_end - planned_start).total_seconds() / 60)
batch_data = {
'tenant_id': tenant_id,
'schedule_id': schedule.id,
'product_id': product_id, # Product ID from forecast
'product_name': product_name, # Product name resolved above
'recipe_id': product_id, # Assuming recipe_id matches product_id
'batch_number': await self._generate_batch_number(session, tenant_id, target_date, batches_created + 1),
'status': 'scheduled',
'priority': 'normal',
'planned_start_time': planned_start,
'planned_end_time': planned_end,
'planned_quantity': production_needed,
'planned_duration_minutes': duration_minutes,
'reasoning_data': reasoning_data, # NEW: Structured reasoning for i18n
'created_at': datetime.now(timezone.utc),
'updated_at': datetime.now(timezone.utc),
}
batch = await batch_repo.create_batch(batch_data)
batches_created += 1
total_planned_quantity += production_needed
logger.info("Production batch created from forecast",
batch_id=str(batch.id),
product_id=str(product_id),
planned_quantity=production_needed)
except Exception as e:
error_msg = f"Error creating batch for product {forecast.get('product_id')}: {str(e)}"
logger.warning(error_msg, tenant_id=str(tenant_id))
warnings.append(error_msg)
continue
# Update schedule with batch counts
await schedule_repo.update_schedule(
schedule.id,
tenant_id,
{'total_batches': batches_created}
)
logger.info("Production schedule generated successfully",
tenant_id=str(tenant_id),
schedule_id=str(schedule.id),
batches_created=batches_created)
return {
'schedule_id': schedule.id,
'schedule_number': schedule.schedule_number,
'batches_created': batches_created,
'total_planned_quantity': total_planned_quantity,
'warnings': warnings
}
except Exception as e:
logger.error("Error generating production schedule from forecast",
error=str(e), tenant_id=str(tenant_id))
raise
async def _generate_batch_number(
self,
session,
tenant_id: UUID,
target_date: date,
batch_index: int
) -> str:
"""Generate batch number in format BATCH-YYYYMMDD-NNN"""
date_str = target_date.strftime("%Y%m%d")
return f"BATCH-{date_str}-{batch_index:03d}"