628 lines
25 KiB
Python
628 lines
25 KiB
Python
"""
|
|
Tenant Locations API - Handles tenant location operations
|
|
"""
|
|
|
|
import structlog
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Path, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from uuid import UUID
|
|
|
|
from app.schemas.tenant_locations import (
|
|
TenantLocationCreate,
|
|
TenantLocationUpdate,
|
|
TenantLocationResponse,
|
|
TenantLocationsResponse,
|
|
TenantLocationTypeFilter
|
|
)
|
|
from app.repositories.tenant_location_repository import TenantLocationRepository
|
|
from shared.auth.decorators import get_current_user_dep
|
|
from shared.auth.access_control import admin_role_required
|
|
from shared.monitoring.metrics import track_endpoint_metrics
|
|
from shared.routing.route_builder import RouteBuilder
|
|
|
|
logger = structlog.get_logger()
|
|
router = APIRouter()
|
|
route_builder = RouteBuilder("tenants")
|
|
|
|
|
|
# Dependency injection for tenant location repository
|
|
async def get_tenant_location_repository():
|
|
"""Get tenant location repository instance with proper session management"""
|
|
try:
|
|
from app.core.database import database_manager
|
|
|
|
# Use async context manager properly to ensure session is closed
|
|
async with database_manager.get_session() as session:
|
|
yield TenantLocationRepository(session)
|
|
except Exception as e:
|
|
logger.error("Failed to create tenant location repository", error=str(e))
|
|
raise HTTPException(status_code=500, detail="Service initialization failed")
|
|
|
|
|
|
@router.get(route_builder.build_base_route("{tenant_id}/locations", include_tenant_prefix=False), response_model=TenantLocationsResponse)
|
|
@track_endpoint_metrics("tenant_locations_list")
|
|
async def get_tenant_locations(
|
|
tenant_id: UUID = Path(..., description="Tenant ID"),
|
|
location_types: str = Query(None, description="Comma-separated list of location types to filter"),
|
|
is_active: Optional[bool] = Query(None, description="Filter by active status"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
location_repo: TenantLocationRepository = Depends(get_tenant_location_repository)
|
|
):
|
|
"""
|
|
Get all locations for a tenant.
|
|
|
|
Args:
|
|
tenant_id: ID of the tenant to get locations for
|
|
location_types: Optional comma-separated list of location types to filter (e.g., "central_production,retail_outlet")
|
|
is_active: Optional filter for active locations only
|
|
current_user: Current user making the request
|
|
location_repo: Tenant location repository instance
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Get tenant locations request received",
|
|
tenant_id=str(tenant_id),
|
|
user_id=current_user.get("user_id"),
|
|
location_types=location_types,
|
|
is_active=is_active
|
|
)
|
|
|
|
# Check that the user has access to this tenant
|
|
# This would typically be checked via access control middleware
|
|
# For now, we'll trust the gateway has validated tenant access
|
|
|
|
locations = []
|
|
|
|
if location_types:
|
|
# Filter by specific location types
|
|
types_list = [t.strip() for t in location_types.split(",")]
|
|
locations = await location_repo.get_locations_by_tenant_with_type(str(tenant_id), types_list)
|
|
elif is_active is True:
|
|
# Get only active locations
|
|
locations = await location_repo.get_active_locations_by_tenant(str(tenant_id))
|
|
elif is_active is False:
|
|
# Get only inactive locations (by getting all and filtering in memory - not efficient but functional)
|
|
all_locations = await location_repo.get_locations_by_tenant(str(tenant_id))
|
|
locations = [loc for loc in all_locations if not loc.is_active]
|
|
else:
|
|
# Get all locations
|
|
locations = await location_repo.get_locations_by_tenant(str(tenant_id))
|
|
|
|
logger.debug(
|
|
"Get tenant locations successful",
|
|
tenant_id=str(tenant_id),
|
|
location_count=len(locations)
|
|
)
|
|
|
|
# Convert to response format - handle metadata field to avoid SQLAlchemy conflicts
|
|
location_responses = []
|
|
for loc in locations:
|
|
# Create dict from ORM object manually to handle metadata field properly
|
|
loc_dict = {
|
|
'id': str(loc.id),
|
|
'tenant_id': str(loc.tenant_id),
|
|
'name': loc.name,
|
|
'location_type': loc.location_type,
|
|
'address': loc.address,
|
|
'city': loc.city,
|
|
'postal_code': loc.postal_code,
|
|
'latitude': loc.latitude,
|
|
'longitude': loc.longitude,
|
|
'contact_person': loc.contact_person,
|
|
'contact_phone': loc.contact_phone,
|
|
'contact_email': loc.contact_email,
|
|
'is_active': loc.is_active,
|
|
'delivery_windows': loc.delivery_windows,
|
|
'operational_hours': loc.operational_hours,
|
|
'capacity': loc.capacity,
|
|
'max_delivery_radius_km': loc.max_delivery_radius_km,
|
|
'delivery_schedule_config': loc.delivery_schedule_config,
|
|
'metadata': loc.metadata_, # Use the actual column name to avoid conflict
|
|
'created_at': loc.created_at,
|
|
'updated_at': loc.updated_at
|
|
}
|
|
location_responses.append(TenantLocationResponse.model_validate(loc_dict))
|
|
|
|
return TenantLocationsResponse(
|
|
locations=location_responses,
|
|
total=len(location_responses)
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Get tenant locations failed",
|
|
tenant_id=str(tenant_id),
|
|
user_id=current_user.get("user_id"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Get tenant locations failed"
|
|
)
|
|
|
|
|
|
@router.get(route_builder.build_base_route("{tenant_id}/locations/{location_id}", include_tenant_prefix=False), response_model=TenantLocationResponse)
|
|
@track_endpoint_metrics("tenant_location_get")
|
|
async def get_tenant_location(
|
|
tenant_id: UUID = Path(..., description="Tenant ID"),
|
|
location_id: UUID = Path(..., description="Location ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
location_repo: TenantLocationRepository = Depends(get_tenant_location_repository)
|
|
):
|
|
"""
|
|
Get a specific location for a tenant.
|
|
|
|
Args:
|
|
tenant_id: ID of the tenant
|
|
location_id: ID of the location to retrieve
|
|
current_user: Current user making the request
|
|
location_repo: Tenant location repository instance
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Get tenant location request received",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Get the specific location
|
|
location = await location_repo.get_location_by_id(str(location_id))
|
|
|
|
if not location:
|
|
logger.warning(
|
|
"Location not found",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
# Verify that the location belongs to the specified tenant
|
|
if str(location.tenant_id) != str(tenant_id):
|
|
logger.warning(
|
|
"Location does not belong to tenant",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
location_tenant_id=str(location.tenant_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
logger.debug(
|
|
"Get tenant location successful",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Create dict from ORM object manually to handle metadata field properly
|
|
loc_dict = {
|
|
'id': str(location.id),
|
|
'tenant_id': str(location.tenant_id),
|
|
'name': location.name,
|
|
'location_type': location.location_type,
|
|
'address': location.address,
|
|
'city': location.city,
|
|
'postal_code': location.postal_code,
|
|
'latitude': location.latitude,
|
|
'longitude': location.longitude,
|
|
'contact_person': location.contact_person,
|
|
'contact_phone': location.contact_phone,
|
|
'contact_email': location.contact_email,
|
|
'is_active': location.is_active,
|
|
'delivery_windows': location.delivery_windows,
|
|
'operational_hours': location.operational_hours,
|
|
'capacity': location.capacity,
|
|
'max_delivery_radius_km': location.max_delivery_radius_km,
|
|
'delivery_schedule_config': location.delivery_schedule_config,
|
|
'metadata': location.metadata_, # Use the actual column name to avoid conflict
|
|
'created_at': location.created_at,
|
|
'updated_at': location.updated_at
|
|
}
|
|
return TenantLocationResponse.model_validate(loc_dict)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Get tenant location failed",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Get tenant location failed"
|
|
)
|
|
|
|
|
|
@router.post(route_builder.build_base_route("{tenant_id}/locations", include_tenant_prefix=False), response_model=TenantLocationResponse)
|
|
@admin_role_required
|
|
async def create_tenant_location(
|
|
location_data: TenantLocationCreate,
|
|
tenant_id: UUID = Path(..., description="Tenant ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
location_repo: TenantLocationRepository = Depends(get_tenant_location_repository)
|
|
):
|
|
"""
|
|
Create a new location for a tenant.
|
|
Requires admin or owner privileges.
|
|
|
|
Args:
|
|
location_data: Location data to create
|
|
tenant_id: ID of the tenant to create location for
|
|
current_user: Current user making the request
|
|
location_repo: Tenant location repository instance
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Create tenant location request received",
|
|
tenant_id=str(tenant_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Verify that the tenant_id in the path matches the one in the data
|
|
if str(tenant_id) != location_data.tenant_id:
|
|
logger.warning(
|
|
"Tenant ID mismatch",
|
|
path_tenant_id=str(tenant_id),
|
|
data_tenant_id=location_data.tenant_id,
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Tenant ID in path does not match data"
|
|
)
|
|
|
|
# Prepare location data by excluding unset values
|
|
location_dict = location_data.model_dump(exclude_unset=True)
|
|
# Ensure tenant_id comes from the path for security
|
|
location_dict['tenant_id'] = str(tenant_id)
|
|
|
|
created_location = await location_repo.create_location(location_dict)
|
|
|
|
logger.info(
|
|
"Created tenant location successfully",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(created_location.id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Create dict from ORM object manually to handle metadata field properly
|
|
loc_dict = {
|
|
'id': str(created_location.id),
|
|
'tenant_id': str(created_location.tenant_id),
|
|
'name': created_location.name,
|
|
'location_type': created_location.location_type,
|
|
'address': created_location.address,
|
|
'city': created_location.city,
|
|
'postal_code': created_location.postal_code,
|
|
'latitude': created_location.latitude,
|
|
'longitude': created_location.longitude,
|
|
'contact_person': created_location.contact_person,
|
|
'contact_phone': created_location.contact_phone,
|
|
'contact_email': created_location.contact_email,
|
|
'is_active': created_location.is_active,
|
|
'delivery_windows': created_location.delivery_windows,
|
|
'operational_hours': created_location.operational_hours,
|
|
'capacity': created_location.capacity,
|
|
'max_delivery_radius_km': created_location.max_delivery_radius_km,
|
|
'delivery_schedule_config': created_location.delivery_schedule_config,
|
|
'metadata': created_location.metadata_, # Use the actual column name to avoid conflict
|
|
'created_at': created_location.created_at,
|
|
'updated_at': created_location.updated_at
|
|
}
|
|
return TenantLocationResponse.model_validate(loc_dict)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Create tenant location failed",
|
|
tenant_id=str(tenant_id),
|
|
user_id=current_user.get("user_id"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Create tenant location failed"
|
|
)
|
|
|
|
|
|
@router.put(route_builder.build_base_route("{tenant_id}/locations/{location_id}", include_tenant_prefix=False), response_model=TenantLocationResponse)
|
|
@admin_role_required
|
|
async def update_tenant_location(
|
|
update_data: TenantLocationUpdate,
|
|
tenant_id: UUID = Path(..., description="Tenant ID"),
|
|
location_id: UUID = Path(..., description="Location ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
location_repo: TenantLocationRepository = Depends(get_tenant_location_repository)
|
|
):
|
|
"""
|
|
Update a tenant location.
|
|
Requires admin or owner privileges.
|
|
|
|
Args:
|
|
update_data: Location data to update
|
|
tenant_id: ID of the tenant
|
|
location_id: ID of the location to update
|
|
current_user: Current user making the request
|
|
location_repo: Tenant location repository instance
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Update tenant location request received",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Check if the location exists and belongs to the tenant
|
|
existing_location = await location_repo.get_location_by_id(str(location_id))
|
|
if not existing_location:
|
|
logger.warning(
|
|
"Location not found for update",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
if str(existing_location.tenant_id) != str(tenant_id):
|
|
logger.warning(
|
|
"Location does not belong to tenant for update",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
location_tenant_id=str(existing_location.tenant_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
# Prepare update data by excluding unset values
|
|
update_dict = update_data.model_dump(exclude_unset=True)
|
|
|
|
updated_location = await location_repo.update_location(str(location_id), update_dict)
|
|
|
|
if not updated_location:
|
|
logger.error(
|
|
"Failed to update location (not found after verification)",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
logger.info(
|
|
"Updated tenant location successfully",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Create dict from ORM object manually to handle metadata field properly
|
|
loc_dict = {
|
|
'id': str(updated_location.id),
|
|
'tenant_id': str(updated_location.tenant_id),
|
|
'name': updated_location.name,
|
|
'location_type': updated_location.location_type,
|
|
'address': updated_location.address,
|
|
'city': updated_location.city,
|
|
'postal_code': updated_location.postal_code,
|
|
'latitude': updated_location.latitude,
|
|
'longitude': updated_location.longitude,
|
|
'contact_person': updated_location.contact_person,
|
|
'contact_phone': updated_location.contact_phone,
|
|
'contact_email': updated_location.contact_email,
|
|
'is_active': updated_location.is_active,
|
|
'delivery_windows': updated_location.delivery_windows,
|
|
'operational_hours': updated_location.operational_hours,
|
|
'capacity': updated_location.capacity,
|
|
'max_delivery_radius_km': updated_location.max_delivery_radius_km,
|
|
'delivery_schedule_config': updated_location.delivery_schedule_config,
|
|
'metadata': updated_location.metadata_, # Use the actual column name to avoid conflict
|
|
'created_at': updated_location.created_at,
|
|
'updated_at': updated_location.updated_at
|
|
}
|
|
return TenantLocationResponse.model_validate(loc_dict)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Update tenant location failed",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Update tenant location failed"
|
|
)
|
|
|
|
|
|
@router.delete(route_builder.build_base_route("{tenant_id}/locations/{location_id}", include_tenant_prefix=False))
|
|
@admin_role_required
|
|
async def delete_tenant_location(
|
|
tenant_id: UUID = Path(..., description="Tenant ID"),
|
|
location_id: UUID = Path(..., description="Location ID"),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
location_repo: TenantLocationRepository = Depends(get_tenant_location_repository)
|
|
):
|
|
"""
|
|
Delete a tenant location.
|
|
Requires admin or owner privileges.
|
|
|
|
Args:
|
|
tenant_id: ID of the tenant
|
|
location_id: ID of the location to delete
|
|
current_user: Current user making the request
|
|
location_repo: Tenant location repository instance
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Delete tenant location request received",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Check if the location exists and belongs to the tenant
|
|
existing_location = await location_repo.get_location_by_id(str(location_id))
|
|
if not existing_location:
|
|
logger.warning(
|
|
"Location not found for deletion",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
if str(existing_location.tenant_id) != str(tenant_id):
|
|
logger.warning(
|
|
"Location does not belong to tenant for deletion",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
location_tenant_id=str(existing_location.tenant_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
deleted = await location_repo.delete_location(str(location_id))
|
|
|
|
if not deleted:
|
|
logger.warning(
|
|
"Location not found for deletion (race condition)",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Location not found"
|
|
)
|
|
|
|
logger.info(
|
|
"Deleted tenant location successfully",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
return {
|
|
"message": "Location deleted successfully",
|
|
"location_id": str(location_id)
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Delete tenant location failed",
|
|
tenant_id=str(tenant_id),
|
|
location_id=str(location_id),
|
|
user_id=current_user.get("user_id"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Delete tenant location failed"
|
|
)
|
|
|
|
|
|
@router.get(route_builder.build_base_route("{tenant_id}/locations/type/{location_type}", include_tenant_prefix=False), response_model=TenantLocationsResponse)
|
|
@track_endpoint_metrics("tenant_locations_by_type")
|
|
async def get_tenant_locations_by_type(
|
|
tenant_id: UUID = Path(..., description="Tenant ID"),
|
|
location_type: str = Path(..., description="Location type to filter by", pattern=r'^(central_production|retail_outlet|warehouse|store|branch)$'),
|
|
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
|
location_repo: TenantLocationRepository = Depends(get_tenant_location_repository)
|
|
):
|
|
"""
|
|
Get all locations of a specific type for a tenant.
|
|
|
|
Args:
|
|
tenant_id: ID of the tenant to get locations for
|
|
location_type: Type of location to filter by
|
|
current_user: Current user making the request
|
|
location_repo: Tenant location repository instance
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Get tenant locations by type request received",
|
|
tenant_id=str(tenant_id),
|
|
location_type=location_type,
|
|
user_id=current_user.get("user_id")
|
|
)
|
|
|
|
# Use the method that returns multiple locations by types
|
|
location_list = await location_repo.get_locations_by_tenant_with_type(str(tenant_id), [location_type])
|
|
|
|
logger.debug(
|
|
"Get tenant locations by type successful",
|
|
tenant_id=str(tenant_id),
|
|
location_type=location_type,
|
|
location_count=len(location_list)
|
|
)
|
|
|
|
# Convert to response format - handle metadata field to avoid SQLAlchemy conflicts
|
|
location_responses = []
|
|
for loc in location_list:
|
|
# Create dict from ORM object manually to handle metadata field properly
|
|
loc_dict = {
|
|
'id': str(loc.id),
|
|
'tenant_id': str(loc.tenant_id),
|
|
'name': loc.name,
|
|
'location_type': loc.location_type,
|
|
'address': loc.address,
|
|
'city': loc.city,
|
|
'postal_code': loc.postal_code,
|
|
'latitude': loc.latitude,
|
|
'longitude': loc.longitude,
|
|
'contact_person': loc.contact_person,
|
|
'contact_phone': loc.contact_phone,
|
|
'contact_email': loc.contact_email,
|
|
'is_active': loc.is_active,
|
|
'delivery_windows': loc.delivery_windows,
|
|
'operational_hours': loc.operational_hours,
|
|
'capacity': loc.capacity,
|
|
'max_delivery_radius_km': loc.max_delivery_radius_km,
|
|
'delivery_schedule_config': loc.delivery_schedule_config,
|
|
'metadata': loc.metadata_, # Use the actual column name to avoid conflict
|
|
'created_at': loc.created_at,
|
|
'updated_at': loc.updated_at
|
|
}
|
|
location_responses.append(TenantLocationResponse.model_validate(loc_dict))
|
|
|
|
return TenantLocationsResponse(
|
|
locations=location_responses,
|
|
total=len(location_responses)
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Get tenant locations by type failed",
|
|
tenant_id=str(tenant_id),
|
|
location_type=location_type,
|
|
user_id=current_user.get("user_id"),
|
|
error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Get tenant locations by type failed"
|
|
) |