Add more services

This commit is contained in:
Urtzi Alfaro
2025-08-21 20:28:14 +02:00
parent d6fd53e461
commit c6dd6fd1de
85 changed files with 17842 additions and 1828 deletions

View File

@@ -0,0 +1,20 @@
# ================================================================
# services/production/app/repositories/__init__.py
# ================================================================
"""
Repository layer for data access
"""
from .production_batch_repository import ProductionBatchRepository
from .production_schedule_repository import ProductionScheduleRepository
from .production_capacity_repository import ProductionCapacityRepository
from .quality_check_repository import QualityCheckRepository
from .production_alert_repository import ProductionAlertRepository
__all__ = [
"ProductionBatchRepository",
"ProductionScheduleRepository",
"ProductionCapacityRepository",
"QualityCheckRepository",
"ProductionAlertRepository"
]

View File

@@ -0,0 +1,221 @@
"""
Base Repository for Production Service
Service-specific repository base class with production utilities
"""
from typing import Optional, List, Dict, Any, Type
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text, and_, or_
from datetime import datetime, date, timedelta
import structlog
from shared.database.repository import BaseRepository
from shared.database.exceptions import DatabaseError
from shared.database.transactions import transactional
logger = structlog.get_logger()
class ProductionBaseRepository(BaseRepository):
"""Base repository for production service with common production operations"""
def __init__(self, model: Type, session: AsyncSession, cache_ttl: Optional[int] = 300):
# Production data is more dynamic, shorter cache time (5 minutes)
super().__init__(model, session, cache_ttl)
@transactional
async def get_by_tenant_id(self, tenant_id: str, skip: int = 0, limit: int = 100) -> List:
"""Get records by tenant ID"""
if hasattr(self.model, 'tenant_id'):
return await self.get_multi(
skip=skip,
limit=limit,
filters={"tenant_id": tenant_id},
order_by="created_at",
order_desc=True
)
return await self.get_multi(skip=skip, limit=limit)
@transactional
async def get_by_status(
self,
tenant_id: str,
status: str,
skip: int = 0,
limit: int = 100
) -> List:
"""Get records by tenant and status"""
if hasattr(self.model, 'status'):
return await self.get_multi(
skip=skip,
limit=limit,
filters={
"tenant_id": tenant_id,
"status": status
},
order_by="created_at",
order_desc=True
)
return await self.get_by_tenant_id(tenant_id, skip, limit)
@transactional
async def get_by_date_range(
self,
tenant_id: str,
start_date: date,
end_date: date,
date_field: str = "created_at",
skip: int = 0,
limit: int = 100
) -> List:
"""Get records by tenant and date range"""
try:
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
filters = {
"tenant_id": tenant_id,
f"{date_field}__gte": start_datetime,
f"{date_field}__lte": end_datetime
}
return await self.get_multi(
skip=skip,
limit=limit,
filters=filters,
order_by=date_field,
order_desc=True
)
except Exception as e:
logger.error("Error fetching records by date range",
error=str(e), tenant_id=tenant_id)
raise DatabaseError(f"Failed to fetch records by date range: {str(e)}")
@transactional
async def get_active_records(
self,
tenant_id: str,
active_field: str = "is_active",
skip: int = 0,
limit: int = 100
) -> List:
"""Get active records for a tenant"""
if hasattr(self.model, active_field):
return await self.get_multi(
skip=skip,
limit=limit,
filters={
"tenant_id": tenant_id,
active_field: True
},
order_by="created_at",
order_desc=True
)
return await self.get_by_tenant_id(tenant_id, skip, limit)
def _validate_production_data(
self,
data: Dict[str, Any],
required_fields: List[str]
) -> Dict[str, Any]:
"""Validate production data with required fields"""
errors = []
# Check required fields
for field in required_fields:
if field not in data or data[field] is None:
errors.append(f"Missing required field: {field}")
# Validate tenant_id format
if "tenant_id" in data:
try:
import uuid
uuid.UUID(str(data["tenant_id"]))
except (ValueError, TypeError):
errors.append("Invalid tenant_id format")
# Validate datetime fields
datetime_fields = ["planned_start_time", "planned_end_time", "actual_start_time", "actual_end_time"]
for field in datetime_fields:
if field in data and data[field] is not None:
if not isinstance(data[field], (datetime, str)):
errors.append(f"Invalid datetime format for {field}")
# Validate numeric fields
numeric_fields = ["planned_quantity", "actual_quantity", "quality_score", "yield_percentage"]
for field in numeric_fields:
if field in data and data[field] is not None:
try:
float(data[field])
if data[field] < 0:
errors.append(f"{field} cannot be negative")
except (ValueError, TypeError):
errors.append(f"Invalid numeric value for {field}")
# Validate percentage fields (0-100)
percentage_fields = ["yield_percentage", "efficiency_percentage", "utilization_percentage"]
for field in percentage_fields:
if field in data and data[field] is not None:
try:
value = float(data[field])
if value < 0 or value > 100:
errors.append(f"{field} must be between 0 and 100")
except (ValueError, TypeError):
pass # Already caught by numeric validation
return {
"is_valid": len(errors) == 0,
"errors": errors
}
async def get_production_statistics(
self,
tenant_id: str,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get production statistics for a tenant and date range"""
try:
# Base query for the model
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# This would need to be implemented per specific model
# For now, return basic count
records = await self.get_by_date_range(
tenant_id, start_date, end_date, limit=1000
)
return {
"total_records": len(records),
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating production statistics",
error=str(e), tenant_id=tenant_id)
raise DatabaseError(f"Failed to calculate statistics: {str(e)}")
async def check_duplicate(
self,
tenant_id: str,
unique_fields: Dict[str, Any]
) -> bool:
"""Check if a record with the same unique fields exists"""
try:
filters = {"tenant_id": tenant_id}
filters.update(unique_fields)
existing = await self.get_multi(
filters=filters,
limit=1
)
return len(existing) > 0
except Exception as e:
logger.error("Error checking for duplicates",
error=str(e), tenant_id=tenant_id)
return False

View File

@@ -0,0 +1,379 @@
"""
Production Alert Repository
Repository for production alert operations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, text, desc, func
from datetime import datetime, timedelta, date
from uuid import UUID
import structlog
from .base import ProductionBaseRepository
from app.models.production import ProductionAlert, AlertSeverity
from shared.database.exceptions import DatabaseError, ValidationError
from shared.database.transactions import transactional
logger = structlog.get_logger()
class ProductionAlertRepository(ProductionBaseRepository):
"""Repository for production alert operations"""
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 60):
# Alerts are very dynamic, very short cache time (1 minute)
super().__init__(ProductionAlert, session, cache_ttl)
@transactional
async def create_alert(self, alert_data: Dict[str, Any]) -> ProductionAlert:
"""Create a new production alert with validation"""
try:
# Validate alert data
validation_result = self._validate_production_data(
alert_data,
["tenant_id", "alert_type", "title", "message"]
)
if not validation_result["is_valid"]:
raise ValidationError(f"Invalid alert data: {validation_result['errors']}")
# Set default values
if "severity" not in alert_data:
alert_data["severity"] = AlertSeverity.MEDIUM
if "source_system" not in alert_data:
alert_data["source_system"] = "production"
if "is_active" not in alert_data:
alert_data["is_active"] = True
if "is_acknowledged" not in alert_data:
alert_data["is_acknowledged"] = False
if "is_resolved" not in alert_data:
alert_data["is_resolved"] = False
# Create alert
alert = await self.create(alert_data)
logger.info("Production alert created successfully",
alert_id=str(alert.id),
alert_type=alert.alert_type,
severity=alert.severity.value if alert.severity else None,
tenant_id=str(alert.tenant_id))
return alert
except ValidationError:
raise
except Exception as e:
logger.error("Error creating production alert", error=str(e))
raise DatabaseError(f"Failed to create production alert: {str(e)}")
@transactional
async def get_active_alerts(
self,
tenant_id: str,
severity: Optional[AlertSeverity] = None
) -> List[ProductionAlert]:
"""Get active production alerts for a tenant"""
try:
filters = {
"tenant_id": tenant_id,
"is_active": True,
"is_resolved": False
}
if severity:
filters["severity"] = severity
alerts = await self.get_multi(
filters=filters,
order_by="created_at",
order_desc=True
)
logger.info("Retrieved active production alerts",
count=len(alerts),
severity=severity.value if severity else "all",
tenant_id=tenant_id)
return alerts
except Exception as e:
logger.error("Error fetching active alerts", error=str(e))
raise DatabaseError(f"Failed to fetch active alerts: {str(e)}")
@transactional
async def get_alerts_by_type(
self,
tenant_id: str,
alert_type: str,
include_resolved: bool = False
) -> List[ProductionAlert]:
"""Get production alerts by type"""
try:
filters = {
"tenant_id": tenant_id,
"alert_type": alert_type
}
if not include_resolved:
filters["is_resolved"] = False
alerts = await self.get_multi(
filters=filters,
order_by="created_at",
order_desc=True
)
logger.info("Retrieved alerts by type",
count=len(alerts),
alert_type=alert_type,
include_resolved=include_resolved,
tenant_id=tenant_id)
return alerts
except Exception as e:
logger.error("Error fetching alerts by type", error=str(e))
raise DatabaseError(f"Failed to fetch alerts by type: {str(e)}")
@transactional
async def get_alerts_by_batch(
self,
tenant_id: str,
batch_id: str
) -> List[ProductionAlert]:
"""Get production alerts for a specific batch"""
try:
alerts = await self.get_multi(
filters={
"tenant_id": tenant_id,
"batch_id": batch_id
},
order_by="created_at",
order_desc=True
)
logger.info("Retrieved alerts by batch",
count=len(alerts),
batch_id=batch_id,
tenant_id=tenant_id)
return alerts
except Exception as e:
logger.error("Error fetching alerts by batch", error=str(e))
raise DatabaseError(f"Failed to fetch alerts by batch: {str(e)}")
@transactional
async def acknowledge_alert(
self,
alert_id: UUID,
acknowledged_by: str,
acknowledgment_notes: Optional[str] = None
) -> ProductionAlert:
"""Acknowledge a production alert"""
try:
alert = await self.get(alert_id)
if not alert:
raise ValidationError(f"Alert {alert_id} not found")
if alert.is_acknowledged:
raise ValidationError("Alert is already acknowledged")
update_data = {
"is_acknowledged": True,
"acknowledged_by": acknowledged_by,
"acknowledged_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
if acknowledgment_notes:
current_actions = alert.actions_taken or []
current_actions.append({
"action": "acknowledged",
"by": acknowledged_by,
"at": datetime.utcnow().isoformat(),
"notes": acknowledgment_notes
})
update_data["actions_taken"] = current_actions
alert = await self.update(alert_id, update_data)
logger.info("Acknowledged production alert",
alert_id=str(alert_id),
acknowledged_by=acknowledged_by)
return alert
except ValidationError:
raise
except Exception as e:
logger.error("Error acknowledging alert", error=str(e))
raise DatabaseError(f"Failed to acknowledge alert: {str(e)}")
@transactional
async def resolve_alert(
self,
alert_id: UUID,
resolved_by: str,
resolution_notes: str
) -> ProductionAlert:
"""Resolve a production alert"""
try:
alert = await self.get(alert_id)
if not alert:
raise ValidationError(f"Alert {alert_id} not found")
if alert.is_resolved:
raise ValidationError("Alert is already resolved")
update_data = {
"is_resolved": True,
"is_active": False,
"resolved_by": resolved_by,
"resolved_at": datetime.utcnow(),
"resolution_notes": resolution_notes,
"updated_at": datetime.utcnow()
}
# Add to actions taken
current_actions = alert.actions_taken or []
current_actions.append({
"action": "resolved",
"by": resolved_by,
"at": datetime.utcnow().isoformat(),
"notes": resolution_notes
})
update_data["actions_taken"] = current_actions
alert = await self.update(alert_id, update_data)
logger.info("Resolved production alert",
alert_id=str(alert_id),
resolved_by=resolved_by)
return alert
except ValidationError:
raise
except Exception as e:
logger.error("Error resolving alert", error=str(e))
raise DatabaseError(f"Failed to resolve alert: {str(e)}")
@transactional
async def get_alert_statistics(
self,
tenant_id: str,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get alert statistics for a tenant and date range"""
try:
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
alerts = await self.get_multi(
filters={
"tenant_id": tenant_id,
"created_at__gte": start_datetime,
"created_at__lte": end_datetime
}
)
total_alerts = len(alerts)
active_alerts = len([a for a in alerts if a.is_active])
acknowledged_alerts = len([a for a in alerts if a.is_acknowledged])
resolved_alerts = len([a for a in alerts if a.is_resolved])
# Group by severity
by_severity = {}
for severity in AlertSeverity:
severity_alerts = [a for a in alerts if a.severity == severity]
by_severity[severity.value] = {
"total": len(severity_alerts),
"active": len([a for a in severity_alerts if a.is_active]),
"resolved": len([a for a in severity_alerts if a.is_resolved])
}
# Group by alert type
by_type = {}
for alert in alerts:
alert_type = alert.alert_type
if alert_type not in by_type:
by_type[alert_type] = {
"total": 0,
"active": 0,
"resolved": 0
}
by_type[alert_type]["total"] += 1
if alert.is_active:
by_type[alert_type]["active"] += 1
if alert.is_resolved:
by_type[alert_type]["resolved"] += 1
# Calculate resolution time statistics
resolved_with_times = [
a for a in alerts
if a.is_resolved and a.resolved_at and a.created_at
]
resolution_times = []
for alert in resolved_with_times:
resolution_time = (alert.resolved_at - alert.created_at).total_seconds() / 3600 # hours
resolution_times.append(resolution_time)
avg_resolution_time = sum(resolution_times) / len(resolution_times) if resolution_times else 0
return {
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"total_alerts": total_alerts,
"active_alerts": active_alerts,
"acknowledged_alerts": acknowledged_alerts,
"resolved_alerts": resolved_alerts,
"acknowledgment_rate": round((acknowledged_alerts / total_alerts * 100) if total_alerts > 0 else 0, 2),
"resolution_rate": round((resolved_alerts / total_alerts * 100) if total_alerts > 0 else 0, 2),
"average_resolution_time_hours": round(avg_resolution_time, 2),
"by_severity": by_severity,
"by_alert_type": by_type,
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating alert statistics", error=str(e))
raise DatabaseError(f"Failed to calculate alert statistics: {str(e)}")
@transactional
async def cleanup_old_resolved_alerts(
self,
tenant_id: str,
days_to_keep: int = 30
) -> int:
"""Clean up old resolved alerts"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
old_alerts = await self.get_multi(
filters={
"tenant_id": tenant_id,
"is_resolved": True,
"resolved_at__lt": cutoff_date
}
)
deleted_count = 0
for alert in old_alerts:
await self.delete(alert.id)
deleted_count += 1
logger.info("Cleaned up old resolved alerts",
deleted_count=deleted_count,
days_to_keep=days_to_keep,
tenant_id=tenant_id)
return deleted_count
except Exception as e:
logger.error("Error cleaning up old alerts", error=str(e))
raise DatabaseError(f"Failed to clean up old alerts: {str(e)}")

View File

@@ -0,0 +1,346 @@
"""
Production Batch Repository
Repository for production batch operations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, text, desc, func, or_
from datetime import datetime, timedelta, date
from uuid import UUID
import structlog
from .base import ProductionBaseRepository
from app.models.production import ProductionBatch, ProductionStatus, ProductionPriority
from shared.database.exceptions import DatabaseError, ValidationError
from shared.database.transactions import transactional
logger = structlog.get_logger()
class ProductionBatchRepository(ProductionBaseRepository):
"""Repository for production batch operations"""
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 300):
# Production batches are dynamic, short cache time (5 minutes)
super().__init__(ProductionBatch, session, cache_ttl)
@transactional
async def create_batch(self, batch_data: Dict[str, Any]) -> ProductionBatch:
"""Create a new production batch with validation"""
try:
# Validate batch data
validation_result = self._validate_production_data(
batch_data,
["tenant_id", "product_id", "product_name", "planned_start_time",
"planned_end_time", "planned_quantity", "planned_duration_minutes"]
)
if not validation_result["is_valid"]:
raise ValidationError(f"Invalid batch data: {validation_result['errors']}")
# Generate batch number if not provided
if "batch_number" not in batch_data or not batch_data["batch_number"]:
batch_data["batch_number"] = await self._generate_batch_number(
batch_data["tenant_id"]
)
# Set default values
if "status" not in batch_data:
batch_data["status"] = ProductionStatus.PENDING
if "priority" not in batch_data:
batch_data["priority"] = ProductionPriority.MEDIUM
if "is_rush_order" not in batch_data:
batch_data["is_rush_order"] = False
if "is_special_recipe" not in batch_data:
batch_data["is_special_recipe"] = False
# Check for duplicate batch number
if await self.check_duplicate(batch_data["tenant_id"], {"batch_number": batch_data["batch_number"]}):
raise ValidationError(f"Batch number {batch_data['batch_number']} already exists")
# Create batch
batch = await self.create(batch_data)
logger.info("Production batch created successfully",
batch_id=str(batch.id),
batch_number=batch.batch_number,
tenant_id=str(batch.tenant_id))
return batch
except ValidationError:
raise
except Exception as e:
logger.error("Error creating production batch", error=str(e))
raise DatabaseError(f"Failed to create production batch: {str(e)}")
@transactional
async def get_active_batches(self, tenant_id: str) -> List[ProductionBatch]:
"""Get active production batches for a tenant"""
try:
active_statuses = [
ProductionStatus.PENDING,
ProductionStatus.IN_PROGRESS,
ProductionStatus.QUALITY_CHECK,
ProductionStatus.ON_HOLD
]
batches = await self.get_multi(
filters={
"tenant_id": tenant_id,
"status__in": active_statuses
},
order_by="planned_start_time"
)
logger.info("Retrieved active production batches",
count=len(batches),
tenant_id=tenant_id)
return batches
except Exception as e:
logger.error("Error fetching active batches", error=str(e))
raise DatabaseError(f"Failed to fetch active batches: {str(e)}")
@transactional
async def get_batches_by_date_range(
self,
tenant_id: str,
start_date: date,
end_date: date,
status: Optional[ProductionStatus] = None
) -> List[ProductionBatch]:
"""Get production batches within a date range"""
try:
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
filters = {
"tenant_id": tenant_id,
"planned_start_time__gte": start_datetime,
"planned_start_time__lte": end_datetime
}
if status:
filters["status"] = status
batches = await self.get_multi(
filters=filters,
order_by="planned_start_time"
)
logger.info("Retrieved batches by date range",
count=len(batches),
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
tenant_id=tenant_id)
return batches
except Exception as e:
logger.error("Error fetching batches by date range", error=str(e))
raise DatabaseError(f"Failed to fetch batches by date range: {str(e)}")
@transactional
async def get_batches_by_product(
self,
tenant_id: str,
product_id: str,
limit: int = 50
) -> List[ProductionBatch]:
"""Get production batches for a specific product"""
try:
batches = await self.get_multi(
filters={
"tenant_id": tenant_id,
"product_id": product_id
},
order_by="created_at",
order_desc=True,
limit=limit
)
logger.info("Retrieved batches by product",
count=len(batches),
product_id=product_id,
tenant_id=tenant_id)
return batches
except Exception as e:
logger.error("Error fetching batches by product", error=str(e))
raise DatabaseError(f"Failed to fetch batches by product: {str(e)}")
@transactional
async def update_batch_status(
self,
batch_id: UUID,
status: ProductionStatus,
actual_quantity: Optional[float] = None,
notes: Optional[str] = None
) -> ProductionBatch:
"""Update production batch status"""
try:
batch = await self.get(batch_id)
if not batch:
raise ValidationError(f"Batch {batch_id} not found")
update_data = {
"status": status,
"updated_at": datetime.utcnow()
}
# Set completion time if completed
if status == ProductionStatus.COMPLETED:
update_data["completed_at"] = datetime.utcnow()
update_data["actual_end_time"] = datetime.utcnow()
if actual_quantity is not None:
update_data["actual_quantity"] = actual_quantity
# Calculate yield percentage
if batch.planned_quantity > 0:
update_data["yield_percentage"] = (actual_quantity / batch.planned_quantity) * 100
# Set start time if starting production
if status == ProductionStatus.IN_PROGRESS and not batch.actual_start_time:
update_data["actual_start_time"] = datetime.utcnow()
# Add notes
if notes:
if status == ProductionStatus.CANCELLED:
update_data["cancellation_reason"] = notes
elif status == ProductionStatus.ON_HOLD:
update_data["delay_reason"] = notes
else:
update_data["production_notes"] = notes
batch = await self.update(batch_id, update_data)
logger.info("Updated batch status",
batch_id=str(batch_id),
new_status=status.value,
actual_quantity=actual_quantity)
return batch
except ValidationError:
raise
except Exception as e:
logger.error("Error updating batch status", error=str(e))
raise DatabaseError(f"Failed to update batch status: {str(e)}")
@transactional
async def get_production_metrics(
self,
tenant_id: str,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get production metrics for a tenant and date range"""
try:
batches = await self.get_batches_by_date_range(tenant_id, start_date, end_date)
total_batches = len(batches)
completed_batches = len([b for b in batches if b.status == ProductionStatus.COMPLETED])
in_progress_batches = len([b for b in batches if b.status == ProductionStatus.IN_PROGRESS])
cancelled_batches = len([b for b in batches if b.status == ProductionStatus.CANCELLED])
# Calculate totals
total_planned_quantity = sum(b.planned_quantity for b in batches)
total_actual_quantity = sum(b.actual_quantity or 0 for b in batches if b.actual_quantity)
# Calculate average yield
completed_with_yield = [b for b in batches if b.yield_percentage is not None]
avg_yield = (
sum(b.yield_percentage for b in completed_with_yield) / len(completed_with_yield)
if completed_with_yield else 0
)
# Calculate on-time completion rate
on_time_completed = len([
b for b in batches
if b.status == ProductionStatus.COMPLETED
and b.actual_end_time
and b.planned_end_time
and b.actual_end_time <= b.planned_end_time
])
on_time_rate = (on_time_completed / completed_batches * 100) if completed_batches > 0 else 0
return {
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"total_batches": total_batches,
"completed_batches": completed_batches,
"in_progress_batches": in_progress_batches,
"cancelled_batches": cancelled_batches,
"completion_rate": (completed_batches / total_batches * 100) if total_batches > 0 else 0,
"total_planned_quantity": total_planned_quantity,
"total_actual_quantity": total_actual_quantity,
"average_yield_percentage": round(avg_yield, 2),
"on_time_completion_rate": round(on_time_rate, 2),
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating production metrics", error=str(e))
raise DatabaseError(f"Failed to calculate production metrics: {str(e)}")
@transactional
async def get_urgent_batches(self, tenant_id: str, hours_ahead: int = 4) -> List[ProductionBatch]:
"""Get batches that need to start within the specified hours"""
try:
cutoff_time = datetime.utcnow() + timedelta(hours=hours_ahead)
batches = await self.get_multi(
filters={
"tenant_id": tenant_id,
"status": ProductionStatus.PENDING,
"planned_start_time__lte": cutoff_time
},
order_by="planned_start_time"
)
logger.info("Retrieved urgent batches",
count=len(batches),
hours_ahead=hours_ahead,
tenant_id=tenant_id)
return batches
except Exception as e:
logger.error("Error fetching urgent batches", error=str(e))
raise DatabaseError(f"Failed to fetch urgent batches: {str(e)}")
async def _generate_batch_number(self, tenant_id: str) -> str:
"""Generate a unique batch number"""
try:
# Get current date for prefix
today = datetime.utcnow().date()
date_prefix = today.strftime("%Y%m%d")
# Count batches created today
today_start = datetime.combine(today, datetime.min.time())
today_end = datetime.combine(today, datetime.max.time())
daily_batches = await self.get_multi(
filters={
"tenant_id": tenant_id,
"created_at__gte": today_start,
"created_at__lte": today_end
}
)
# Generate sequential number
sequence = len(daily_batches) + 1
batch_number = f"PROD-{date_prefix}-{sequence:03d}"
return batch_number
except Exception as e:
logger.error("Error generating batch number", error=str(e))
# Fallback to timestamp-based number
timestamp = int(datetime.utcnow().timestamp())
return f"PROD-{timestamp}"

View File

@@ -0,0 +1,341 @@
"""
Production Capacity Repository
Repository for production capacity operations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, text, desc, func
from datetime import datetime, timedelta, date
from uuid import UUID
import structlog
from .base import ProductionBaseRepository
from app.models.production import ProductionCapacity
from shared.database.exceptions import DatabaseError, ValidationError
from shared.database.transactions import transactional
logger = structlog.get_logger()
class ProductionCapacityRepository(ProductionBaseRepository):
"""Repository for production capacity operations"""
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 600):
# Capacity data changes moderately, medium cache time (10 minutes)
super().__init__(ProductionCapacity, session, cache_ttl)
@transactional
async def create_capacity(self, capacity_data: Dict[str, Any]) -> ProductionCapacity:
"""Create a new production capacity entry with validation"""
try:
# Validate capacity data
validation_result = self._validate_production_data(
capacity_data,
["tenant_id", "resource_type", "resource_id", "resource_name",
"date", "start_time", "end_time", "total_capacity_units"]
)
if not validation_result["is_valid"]:
raise ValidationError(f"Invalid capacity data: {validation_result['errors']}")
# Set default values
if "allocated_capacity_units" not in capacity_data:
capacity_data["allocated_capacity_units"] = 0.0
if "remaining_capacity_units" not in capacity_data:
capacity_data["remaining_capacity_units"] = capacity_data["total_capacity_units"]
if "is_available" not in capacity_data:
capacity_data["is_available"] = True
if "is_maintenance" not in capacity_data:
capacity_data["is_maintenance"] = False
if "is_reserved" not in capacity_data:
capacity_data["is_reserved"] = False
# Create capacity entry
capacity = await self.create(capacity_data)
logger.info("Production capacity created successfully",
capacity_id=str(capacity.id),
resource_type=capacity.resource_type,
resource_id=capacity.resource_id,
tenant_id=str(capacity.tenant_id))
return capacity
except ValidationError:
raise
except Exception as e:
logger.error("Error creating production capacity", error=str(e))
raise DatabaseError(f"Failed to create production capacity: {str(e)}")
@transactional
async def get_capacity_by_resource(
self,
tenant_id: str,
resource_id: str,
date_filter: Optional[date] = None
) -> List[ProductionCapacity]:
"""Get capacity entries for a specific resource"""
try:
filters = {
"tenant_id": tenant_id,
"resource_id": resource_id
}
if date_filter:
filters["date"] = date_filter
capacities = await self.get_multi(
filters=filters,
order_by="start_time"
)
logger.info("Retrieved capacity by resource",
count=len(capacities),
resource_id=resource_id,
tenant_id=tenant_id)
return capacities
except Exception as e:
logger.error("Error fetching capacity by resource", error=str(e))
raise DatabaseError(f"Failed to fetch capacity by resource: {str(e)}")
@transactional
async def get_available_capacity(
self,
tenant_id: str,
resource_type: str,
target_date: date,
required_capacity: float
) -> List[ProductionCapacity]:
"""Get available capacity for a specific date and capacity requirement"""
try:
capacities = await self.get_multi(
filters={
"tenant_id": tenant_id,
"resource_type": resource_type,
"date": target_date,
"is_available": True,
"is_maintenance": False,
"remaining_capacity_units__gte": required_capacity
},
order_by="remaining_capacity_units",
order_desc=True
)
logger.info("Retrieved available capacity",
count=len(capacities),
resource_type=resource_type,
required_capacity=required_capacity,
tenant_id=tenant_id)
return capacities
except Exception as e:
logger.error("Error fetching available capacity", error=str(e))
raise DatabaseError(f"Failed to fetch available capacity: {str(e)}")
@transactional
async def allocate_capacity(
self,
capacity_id: UUID,
allocation_amount: float,
allocation_notes: Optional[str] = None
) -> ProductionCapacity:
"""Allocate capacity units from a capacity entry"""
try:
capacity = await self.get(capacity_id)
if not capacity:
raise ValidationError(f"Capacity {capacity_id} not found")
if allocation_amount > capacity.remaining_capacity_units:
raise ValidationError(
f"Insufficient capacity: requested {allocation_amount}, "
f"available {capacity.remaining_capacity_units}"
)
new_allocated = capacity.allocated_capacity_units + allocation_amount
new_remaining = capacity.remaining_capacity_units - allocation_amount
update_data = {
"allocated_capacity_units": new_allocated,
"remaining_capacity_units": new_remaining,
"updated_at": datetime.utcnow()
}
if allocation_notes:
current_notes = capacity.notes or ""
update_data["notes"] = f"{current_notes}\n{allocation_notes}".strip()
capacity = await self.update(capacity_id, update_data)
logger.info("Allocated capacity",
capacity_id=str(capacity_id),
allocation_amount=allocation_amount,
remaining_capacity=new_remaining)
return capacity
except ValidationError:
raise
except Exception as e:
logger.error("Error allocating capacity", error=str(e))
raise DatabaseError(f"Failed to allocate capacity: {str(e)}")
@transactional
async def release_capacity(
self,
capacity_id: UUID,
release_amount: float,
release_notes: Optional[str] = None
) -> ProductionCapacity:
"""Release allocated capacity units back to a capacity entry"""
try:
capacity = await self.get(capacity_id)
if not capacity:
raise ValidationError(f"Capacity {capacity_id} not found")
if release_amount > capacity.allocated_capacity_units:
raise ValidationError(
f"Cannot release more than allocated: requested {release_amount}, "
f"allocated {capacity.allocated_capacity_units}"
)
new_allocated = capacity.allocated_capacity_units - release_amount
new_remaining = capacity.remaining_capacity_units + release_amount
update_data = {
"allocated_capacity_units": new_allocated,
"remaining_capacity_units": new_remaining,
"updated_at": datetime.utcnow()
}
if release_notes:
current_notes = capacity.notes or ""
update_data["notes"] = f"{current_notes}\n{release_notes}".strip()
capacity = await self.update(capacity_id, update_data)
logger.info("Released capacity",
capacity_id=str(capacity_id),
release_amount=release_amount,
remaining_capacity=new_remaining)
return capacity
except ValidationError:
raise
except Exception as e:
logger.error("Error releasing capacity", error=str(e))
raise DatabaseError(f"Failed to release capacity: {str(e)}")
@transactional
async def get_capacity_utilization_summary(
self,
tenant_id: str,
start_date: date,
end_date: date,
resource_type: Optional[str] = None
) -> Dict[str, Any]:
"""Get capacity utilization summary for a date range"""
try:
filters = {
"tenant_id": tenant_id,
"date__gte": start_date,
"date__lte": end_date
}
if resource_type:
filters["resource_type"] = resource_type
capacities = await self.get_multi(filters=filters)
total_capacity = sum(c.total_capacity_units for c in capacities)
total_allocated = sum(c.allocated_capacity_units for c in capacities)
total_available = sum(c.remaining_capacity_units for c in capacities)
# Group by resource type
by_resource_type = {}
for capacity in capacities:
rt = capacity.resource_type
if rt not in by_resource_type:
by_resource_type[rt] = {
"total_capacity": 0,
"allocated_capacity": 0,
"available_capacity": 0,
"resource_count": 0
}
by_resource_type[rt]["total_capacity"] += capacity.total_capacity_units
by_resource_type[rt]["allocated_capacity"] += capacity.allocated_capacity_units
by_resource_type[rt]["available_capacity"] += capacity.remaining_capacity_units
by_resource_type[rt]["resource_count"] += 1
# Calculate utilization percentages
for rt_data in by_resource_type.values():
if rt_data["total_capacity"] > 0:
rt_data["utilization_percentage"] = round(
(rt_data["allocated_capacity"] / rt_data["total_capacity"]) * 100, 2
)
else:
rt_data["utilization_percentage"] = 0
return {
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"total_capacity_units": total_capacity,
"total_allocated_units": total_allocated,
"total_available_units": total_available,
"overall_utilization_percentage": round(
(total_allocated / total_capacity * 100) if total_capacity > 0 else 0, 2
),
"by_resource_type": by_resource_type,
"total_resources": len(capacities),
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating capacity utilization summary", error=str(e))
raise DatabaseError(f"Failed to calculate capacity utilization summary: {str(e)}")
@transactional
async def set_maintenance_mode(
self,
capacity_id: UUID,
is_maintenance: bool,
maintenance_notes: Optional[str] = None
) -> ProductionCapacity:
"""Set maintenance mode for a capacity entry"""
try:
capacity = await self.get(capacity_id)
if not capacity:
raise ValidationError(f"Capacity {capacity_id} not found")
update_data = {
"is_maintenance": is_maintenance,
"is_available": not is_maintenance, # Not available when in maintenance
"updated_at": datetime.utcnow()
}
if is_maintenance:
update_data["maintenance_status"] = "in_maintenance"
if maintenance_notes:
update_data["notes"] = maintenance_notes
else:
update_data["maintenance_status"] = "operational"
update_data["last_maintenance_date"] = datetime.utcnow()
capacity = await self.update(capacity_id, update_data)
logger.info("Set maintenance mode",
capacity_id=str(capacity_id),
is_maintenance=is_maintenance)
return capacity
except ValidationError:
raise
except Exception as e:
logger.error("Error setting maintenance mode", error=str(e))
raise DatabaseError(f"Failed to set maintenance mode: {str(e)}")

View File

@@ -0,0 +1,279 @@
"""
Production Schedule Repository
Repository for production schedule operations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, text, desc, func
from datetime import datetime, timedelta, date
from uuid import UUID
import structlog
from .base import ProductionBaseRepository
from app.models.production import ProductionSchedule
from shared.database.exceptions import DatabaseError, ValidationError
from shared.database.transactions import transactional
logger = structlog.get_logger()
class ProductionScheduleRepository(ProductionBaseRepository):
"""Repository for production schedule operations"""
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 600):
# Schedules are more stable, medium cache time (10 minutes)
super().__init__(ProductionSchedule, session, cache_ttl)
@transactional
async def create_schedule(self, schedule_data: Dict[str, Any]) -> ProductionSchedule:
"""Create a new production schedule with validation"""
try:
# Validate schedule data
validation_result = self._validate_production_data(
schedule_data,
["tenant_id", "schedule_date", "shift_start", "shift_end",
"total_capacity_hours", "planned_capacity_hours", "staff_count"]
)
if not validation_result["is_valid"]:
raise ValidationError(f"Invalid schedule data: {validation_result['errors']}")
# Set default values
if "is_finalized" not in schedule_data:
schedule_data["is_finalized"] = False
if "is_active" not in schedule_data:
schedule_data["is_active"] = True
if "overtime_hours" not in schedule_data:
schedule_data["overtime_hours"] = 0.0
# Validate date uniqueness
existing_schedule = await self.get_schedule_by_date(
schedule_data["tenant_id"],
schedule_data["schedule_date"]
)
if existing_schedule:
raise ValidationError(f"Schedule for date {schedule_data['schedule_date']} already exists")
# Create schedule
schedule = await self.create(schedule_data)
logger.info("Production schedule created successfully",
schedule_id=str(schedule.id),
schedule_date=schedule.schedule_date.isoformat(),
tenant_id=str(schedule.tenant_id))
return schedule
except ValidationError:
raise
except Exception as e:
logger.error("Error creating production schedule", error=str(e))
raise DatabaseError(f"Failed to create production schedule: {str(e)}")
@transactional
async def get_schedule_by_date(
self,
tenant_id: str,
schedule_date: date
) -> Optional[ProductionSchedule]:
"""Get production schedule for a specific date"""
try:
schedules = await self.get_multi(
filters={
"tenant_id": tenant_id,
"schedule_date": schedule_date
},
limit=1
)
schedule = schedules[0] if schedules else None
if schedule:
logger.info("Retrieved production schedule by date",
schedule_id=str(schedule.id),
schedule_date=schedule_date.isoformat(),
tenant_id=tenant_id)
return schedule
except Exception as e:
logger.error("Error fetching schedule by date", error=str(e))
raise DatabaseError(f"Failed to fetch schedule by date: {str(e)}")
@transactional
async def get_schedules_by_date_range(
self,
tenant_id: str,
start_date: date,
end_date: date
) -> List[ProductionSchedule]:
"""Get production schedules within a date range"""
try:
schedules = await self.get_multi(
filters={
"tenant_id": tenant_id,
"schedule_date__gte": start_date,
"schedule_date__lte": end_date
},
order_by="schedule_date"
)
logger.info("Retrieved schedules by date range",
count=len(schedules),
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
tenant_id=tenant_id)
return schedules
except Exception as e:
logger.error("Error fetching schedules by date range", error=str(e))
raise DatabaseError(f"Failed to fetch schedules by date range: {str(e)}")
@transactional
async def get_active_schedules(self, tenant_id: str) -> List[ProductionSchedule]:
"""Get active production schedules for a tenant"""
try:
schedules = await self.get_multi(
filters={
"tenant_id": tenant_id,
"is_active": True
},
order_by="schedule_date"
)
logger.info("Retrieved active production schedules",
count=len(schedules),
tenant_id=tenant_id)
return schedules
except Exception as e:
logger.error("Error fetching active schedules", error=str(e))
raise DatabaseError(f"Failed to fetch active schedules: {str(e)}")
@transactional
async def finalize_schedule(
self,
schedule_id: UUID,
finalized_by: str
) -> ProductionSchedule:
"""Finalize a production schedule"""
try:
schedule = await self.get(schedule_id)
if not schedule:
raise ValidationError(f"Schedule {schedule_id} not found")
if schedule.is_finalized:
raise ValidationError("Schedule is already finalized")
update_data = {
"is_finalized": True,
"finalized_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
schedule = await self.update(schedule_id, update_data)
logger.info("Production schedule finalized",
schedule_id=str(schedule_id),
finalized_by=finalized_by)
return schedule
except ValidationError:
raise
except Exception as e:
logger.error("Error finalizing schedule", error=str(e))
raise DatabaseError(f"Failed to finalize schedule: {str(e)}")
@transactional
async def update_schedule_metrics(
self,
schedule_id: UUID,
metrics: Dict[str, Any]
) -> ProductionSchedule:
"""Update production schedule metrics"""
try:
schedule = await self.get(schedule_id)
if not schedule:
raise ValidationError(f"Schedule {schedule_id} not found")
# Validate metrics
valid_metrics = [
"actual_capacity_hours", "total_batches_completed",
"total_quantity_produced", "efficiency_percentage",
"utilization_percentage", "on_time_completion_rate"
]
update_data = {"updated_at": datetime.utcnow()}
for metric, value in metrics.items():
if metric in valid_metrics:
update_data[metric] = value
schedule = await self.update(schedule_id, update_data)
logger.info("Updated schedule metrics",
schedule_id=str(schedule_id),
metrics=list(metrics.keys()))
return schedule
except ValidationError:
raise
except Exception as e:
logger.error("Error updating schedule metrics", error=str(e))
raise DatabaseError(f"Failed to update schedule metrics: {str(e)}")
@transactional
async def get_schedule_performance_summary(
self,
tenant_id: str,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get schedule performance summary for a date range"""
try:
schedules = await self.get_schedules_by_date_range(tenant_id, start_date, end_date)
total_schedules = len(schedules)
finalized_schedules = len([s for s in schedules if s.is_finalized])
# Calculate averages
total_planned_hours = sum(s.planned_capacity_hours for s in schedules)
total_actual_hours = sum(s.actual_capacity_hours or 0 for s in schedules)
total_overtime = sum(s.overtime_hours or 0 for s in schedules)
# Calculate efficiency metrics
schedules_with_efficiency = [s for s in schedules if s.efficiency_percentage is not None]
avg_efficiency = (
sum(s.efficiency_percentage for s in schedules_with_efficiency) / len(schedules_with_efficiency)
if schedules_with_efficiency else 0
)
schedules_with_utilization = [s for s in schedules if s.utilization_percentage is not None]
avg_utilization = (
sum(s.utilization_percentage for s in schedules_with_utilization) / len(schedules_with_utilization)
if schedules_with_utilization else 0
)
return {
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"total_schedules": total_schedules,
"finalized_schedules": finalized_schedules,
"finalization_rate": (finalized_schedules / total_schedules * 100) if total_schedules > 0 else 0,
"total_planned_hours": total_planned_hours,
"total_actual_hours": total_actual_hours,
"total_overtime_hours": total_overtime,
"capacity_utilization": (total_actual_hours / total_planned_hours * 100) if total_planned_hours > 0 else 0,
"average_efficiency_percentage": round(avg_efficiency, 2),
"average_utilization_percentage": round(avg_utilization, 2),
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating schedule performance summary", error=str(e))
raise DatabaseError(f"Failed to calculate schedule performance summary: {str(e)}")

View File

@@ -0,0 +1,319 @@
"""
Quality Check Repository
Repository for quality check operations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, text, desc, func
from datetime import datetime, timedelta, date
from uuid import UUID
import structlog
from .base import ProductionBaseRepository
from app.models.production import QualityCheck
from shared.database.exceptions import DatabaseError, ValidationError
from shared.database.transactions import transactional
logger = structlog.get_logger()
class QualityCheckRepository(ProductionBaseRepository):
"""Repository for quality check operations"""
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 300):
# Quality checks are dynamic, short cache time (5 minutes)
super().__init__(QualityCheck, session, cache_ttl)
@transactional
async def create_quality_check(self, check_data: Dict[str, Any]) -> QualityCheck:
"""Create a new quality check with validation"""
try:
# Validate check data
validation_result = self._validate_production_data(
check_data,
["tenant_id", "batch_id", "check_type", "check_time",
"quality_score", "pass_fail"]
)
if not validation_result["is_valid"]:
raise ValidationError(f"Invalid quality check data: {validation_result['errors']}")
# Validate quality score range (1-10)
if check_data.get("quality_score"):
score = float(check_data["quality_score"])
if score < 1 or score > 10:
raise ValidationError("Quality score must be between 1 and 10")
# Set default values
if "defect_count" not in check_data:
check_data["defect_count"] = 0
if "corrective_action_needed" not in check_data:
check_data["corrective_action_needed"] = False
# Create quality check
quality_check = await self.create(check_data)
logger.info("Quality check created successfully",
check_id=str(quality_check.id),
batch_id=str(quality_check.batch_id),
check_type=quality_check.check_type,
quality_score=quality_check.quality_score,
tenant_id=str(quality_check.tenant_id))
return quality_check
except ValidationError:
raise
except Exception as e:
logger.error("Error creating quality check", error=str(e))
raise DatabaseError(f"Failed to create quality check: {str(e)}")
@transactional
async def get_checks_by_batch(
self,
tenant_id: str,
batch_id: str
) -> List[QualityCheck]:
"""Get all quality checks for a specific batch"""
try:
checks = await self.get_multi(
filters={
"tenant_id": tenant_id,
"batch_id": batch_id
},
order_by="check_time"
)
logger.info("Retrieved quality checks by batch",
count=len(checks),
batch_id=batch_id,
tenant_id=tenant_id)
return checks
except Exception as e:
logger.error("Error fetching quality checks by batch", error=str(e))
raise DatabaseError(f"Failed to fetch quality checks by batch: {str(e)}")
@transactional
async def get_checks_by_date_range(
self,
tenant_id: str,
start_date: date,
end_date: date,
check_type: Optional[str] = None
) -> List[QualityCheck]:
"""Get quality checks within a date range"""
try:
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
filters = {
"tenant_id": tenant_id,
"check_time__gte": start_datetime,
"check_time__lte": end_datetime
}
if check_type:
filters["check_type"] = check_type
checks = await self.get_multi(
filters=filters,
order_by="check_time",
order_desc=True
)
logger.info("Retrieved quality checks by date range",
count=len(checks),
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
tenant_id=tenant_id)
return checks
except Exception as e:
logger.error("Error fetching quality checks by date range", error=str(e))
raise DatabaseError(f"Failed to fetch quality checks by date range: {str(e)}")
@transactional
async def get_failed_checks(
self,
tenant_id: str,
days_back: int = 7
) -> List[QualityCheck]:
"""Get failed quality checks from the last N days"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days_back)
checks = await self.get_multi(
filters={
"tenant_id": tenant_id,
"pass_fail": False,
"check_time__gte": cutoff_date
},
order_by="check_time",
order_desc=True
)
logger.info("Retrieved failed quality checks",
count=len(checks),
days_back=days_back,
tenant_id=tenant_id)
return checks
except Exception as e:
logger.error("Error fetching failed quality checks", error=str(e))
raise DatabaseError(f"Failed to fetch failed quality checks: {str(e)}")
@transactional
async def get_quality_metrics(
self,
tenant_id: str,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Get quality metrics for a tenant and date range"""
try:
checks = await self.get_checks_by_date_range(tenant_id, start_date, end_date)
total_checks = len(checks)
passed_checks = len([c for c in checks if c.pass_fail])
failed_checks = total_checks - passed_checks
# Calculate average quality score
quality_scores = [c.quality_score for c in checks if c.quality_score is not None]
avg_quality_score = sum(quality_scores) / len(quality_scores) if quality_scores else 0
# Calculate defect rate
total_defects = sum(c.defect_count for c in checks)
avg_defects_per_check = total_defects / total_checks if total_checks > 0 else 0
# Group by check type
by_check_type = {}
for check in checks:
check_type = check.check_type
if check_type not in by_check_type:
by_check_type[check_type] = {
"total_checks": 0,
"passed_checks": 0,
"failed_checks": 0,
"avg_quality_score": 0,
"total_defects": 0
}
by_check_type[check_type]["total_checks"] += 1
if check.pass_fail:
by_check_type[check_type]["passed_checks"] += 1
else:
by_check_type[check_type]["failed_checks"] += 1
by_check_type[check_type]["total_defects"] += check.defect_count
# Calculate pass rates by check type
for type_data in by_check_type.values():
if type_data["total_checks"] > 0:
type_data["pass_rate"] = round(
(type_data["passed_checks"] / type_data["total_checks"]) * 100, 2
)
else:
type_data["pass_rate"] = 0
type_scores = [c.quality_score for c in checks
if c.check_type == check_type and c.quality_score is not None]
type_data["avg_quality_score"] = round(
sum(type_scores) / len(type_scores) if type_scores else 0, 2
)
# Identify trends
checks_needing_action = len([c for c in checks if c.corrective_action_needed])
return {
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"total_checks": total_checks,
"passed_checks": passed_checks,
"failed_checks": failed_checks,
"pass_rate_percentage": round((passed_checks / total_checks * 100) if total_checks > 0 else 0, 2),
"average_quality_score": round(avg_quality_score, 2),
"total_defects": total_defects,
"average_defects_per_check": round(avg_defects_per_check, 2),
"checks_needing_corrective_action": checks_needing_action,
"by_check_type": by_check_type,
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating quality metrics", error=str(e))
raise DatabaseError(f"Failed to calculate quality metrics: {str(e)}")
@transactional
async def get_quality_trends(
self,
tenant_id: str,
check_type: str,
days_back: int = 30
) -> Dict[str, Any]:
"""Get quality trends for a specific check type"""
try:
end_date = datetime.utcnow().date()
start_date = end_date - timedelta(days=days_back)
checks = await self.get_checks_by_date_range(
tenant_id, start_date, end_date, check_type
)
# Group by date
daily_metrics = {}
for check in checks:
check_date = check.check_time.date()
if check_date not in daily_metrics:
daily_metrics[check_date] = {
"total_checks": 0,
"passed_checks": 0,
"quality_scores": [],
"defect_count": 0
}
daily_metrics[check_date]["total_checks"] += 1
if check.pass_fail:
daily_metrics[check_date]["passed_checks"] += 1
if check.quality_score is not None:
daily_metrics[check_date]["quality_scores"].append(check.quality_score)
daily_metrics[check_date]["defect_count"] += check.defect_count
# Calculate daily pass rates and averages
trend_data = []
for date_key, metrics in sorted(daily_metrics.items()):
pass_rate = (metrics["passed_checks"] / metrics["total_checks"] * 100) if metrics["total_checks"] > 0 else 0
avg_score = sum(metrics["quality_scores"]) / len(metrics["quality_scores"]) if metrics["quality_scores"] else 0
trend_data.append({
"date": date_key.isoformat(),
"total_checks": metrics["total_checks"],
"pass_rate": round(pass_rate, 2),
"average_quality_score": round(avg_score, 2),
"total_defects": metrics["defect_count"]
})
# Calculate overall trend direction
if len(trend_data) >= 2:
recent_avg = sum(d["pass_rate"] for d in trend_data[-7:]) / min(7, len(trend_data))
earlier_avg = sum(d["pass_rate"] for d in trend_data[:-7]) / max(1, len(trend_data) - 7)
trend_direction = "improving" if recent_avg > earlier_avg else "declining" if recent_avg < earlier_avg else "stable"
else:
trend_direction = "insufficient_data"
return {
"check_type": check_type,
"period_start": start_date.isoformat(),
"period_end": end_date.isoformat(),
"trend_direction": trend_direction,
"daily_data": trend_data,
"total_checks": len(checks),
"tenant_id": tenant_id
}
except Exception as e:
logger.error("Error calculating quality trends", error=str(e))
raise DatabaseError(f"Failed to calculate quality trends: {str(e)}")