Files
bakery-ia/services/procurement/app/ml/price_insights_orchestrator.py
2025-11-05 13:34:56 +01:00

372 lines
13 KiB
Python

"""
Price Insights Orchestrator
Coordinates price forecasting and insight posting
"""
import pandas as pd
from typing import Dict, List, Any, Optional
import structlog
from datetime import datetime
from uuid import UUID
import sys
import os
# Add shared clients to path
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..'))
from shared.clients.ai_insights_client import AIInsightsClient
from app.ml.price_forecaster import PriceForecaster
logger = structlog.get_logger()
class PriceInsightsOrchestrator:
"""
Orchestrates price forecasting and insight generation workflow.
Workflow:
1. Forecast prices from historical data
2. Generate buy/wait/bulk recommendations
3. Post insights to AI Insights Service
4. Provide price forecasts for procurement planning
"""
def __init__(
self,
ai_insights_base_url: str = "http://ai-insights-service:8000"
):
self.forecaster = PriceForecaster()
self.ai_insights_client = AIInsightsClient(ai_insights_base_url)
async def forecast_and_post_insights(
self,
tenant_id: str,
ingredient_id: str,
price_history: pd.DataFrame,
forecast_horizon_days: int = 30,
min_history_days: int = 180
) -> Dict[str, Any]:
"""
Complete workflow: Forecast prices and post insights.
Args:
tenant_id: Tenant identifier
ingredient_id: Ingredient identifier
price_history: Historical price data
forecast_horizon_days: Days to forecast ahead
min_history_days: Minimum days of history required
Returns:
Workflow results with forecast and posted insights
"""
logger.info(
"Starting price forecasting workflow",
tenant_id=tenant_id,
ingredient_id=ingredient_id,
history_days=len(price_history)
)
# Step 1: Forecast prices
forecast_results = await self.forecaster.forecast_price(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
price_history=price_history,
forecast_horizon_days=forecast_horizon_days,
min_history_days=min_history_days
)
logger.info(
"Price forecasting complete",
ingredient_id=ingredient_id,
recommendation=forecast_results.get('recommendations', {}).get('action'),
insights_generated=len(forecast_results.get('insights', []))
)
# Step 2: Enrich insights with tenant_id and ingredient context
enriched_insights = self._enrich_insights(
forecast_results.get('insights', []),
tenant_id,
ingredient_id
)
# Step 3: Post insights to AI Insights Service
if enriched_insights:
post_results = await self.ai_insights_client.create_insights_bulk(
tenant_id=UUID(tenant_id),
insights=enriched_insights
)
logger.info(
"Price insights posted to AI Insights Service",
ingredient_id=ingredient_id,
total=post_results['total'],
successful=post_results['successful'],
failed=post_results['failed']
)
else:
post_results = {'total': 0, 'successful': 0, 'failed': 0}
logger.info("No insights to post for ingredient", ingredient_id=ingredient_id)
# Step 4: Return comprehensive results
return {
'tenant_id': tenant_id,
'ingredient_id': ingredient_id,
'forecasted_at': forecast_results['forecasted_at'],
'history_days': forecast_results['history_days'],
'forecast': forecast_results.get('forecast', {}),
'recommendation': forecast_results.get('recommendations', {}),
'bulk_opportunity': forecast_results.get('bulk_opportunities', {}),
'insights_generated': len(enriched_insights),
'insights_posted': post_results['successful'],
'insights_failed': post_results['failed'],
'created_insights': post_results.get('created_insights', [])
}
def _enrich_insights(
self,
insights: List[Dict[str, Any]],
tenant_id: str,
ingredient_id: str
) -> List[Dict[str, Any]]:
"""
Enrich insights with required fields for AI Insights Service.
Args:
insights: Raw insights from forecaster
tenant_id: Tenant identifier
ingredient_id: Ingredient identifier
Returns:
Enriched insights ready for posting
"""
enriched = []
for insight in insights:
# Add required tenant_id
enriched_insight = insight.copy()
enriched_insight['tenant_id'] = tenant_id
# Add ingredient context to metrics
if 'metrics_json' not in enriched_insight:
enriched_insight['metrics_json'] = {}
enriched_insight['metrics_json']['ingredient_id'] = ingredient_id
# Add source metadata
enriched_insight['source_service'] = 'procurement'
enriched_insight['source_model'] = 'price_forecaster'
enriched_insight['detected_at'] = datetime.utcnow().isoformat()
enriched.append(enriched_insight)
return enriched
async def forecast_all_ingredients(
self,
tenant_id: str,
ingredients_data: Dict[str, pd.DataFrame],
forecast_horizon_days: int = 30,
min_history_days: int = 180
) -> Dict[str, Any]:
"""
Forecast prices for all ingredients for a tenant.
Args:
tenant_id: Tenant identifier
ingredients_data: Dict of {ingredient_id: price_history DataFrame}
forecast_horizon_days: Days to forecast
min_history_days: Minimum history required
Returns:
Comprehensive forecasting results
"""
logger.info(
"Forecasting prices for all ingredients",
tenant_id=tenant_id,
ingredients=len(ingredients_data)
)
all_results = []
total_insights_posted = 0
buy_now_count = 0
wait_count = 0
bulk_opportunity_count = 0
# Forecast each ingredient
for ingredient_id, price_history in ingredients_data.items():
try:
results = await self.forecast_and_post_insights(
tenant_id=tenant_id,
ingredient_id=ingredient_id,
price_history=price_history,
forecast_horizon_days=forecast_horizon_days,
min_history_days=min_history_days
)
all_results.append(results)
total_insights_posted += results['insights_posted']
# Count recommendations
action = results['recommendation'].get('action')
if action == 'buy_now':
buy_now_count += 1
elif action in ['wait', 'wait_for_dip']:
wait_count += 1
if results['bulk_opportunity'].get('has_bulk_opportunity'):
bulk_opportunity_count += 1
except Exception as e:
logger.error(
"Error forecasting ingredient",
ingredient_id=ingredient_id,
error=str(e)
)
# Generate summary insight
if buy_now_count > 0 or bulk_opportunity_count > 0:
summary_insight = self._generate_portfolio_summary_insight(
tenant_id, all_results, buy_now_count, wait_count, bulk_opportunity_count
)
if summary_insight:
enriched_summary = self._enrich_insights(
[summary_insight], tenant_id, 'all_ingredients'
)
post_results = await self.ai_insights_client.create_insights_bulk(
tenant_id=UUID(tenant_id),
insights=enriched_summary
)
total_insights_posted += post_results['successful']
logger.info(
"All ingredients forecasting complete",
tenant_id=tenant_id,
ingredients_forecasted=len(all_results),
total_insights_posted=total_insights_posted,
buy_now_recommendations=buy_now_count,
bulk_opportunities=bulk_opportunity_count
)
return {
'tenant_id': tenant_id,
'forecasted_at': datetime.utcnow().isoformat(),
'ingredients_forecasted': len(all_results),
'ingredient_results': all_results,
'total_insights_posted': total_insights_posted,
'buy_now_count': buy_now_count,
'wait_count': wait_count,
'bulk_opportunity_count': bulk_opportunity_count
}
def _generate_portfolio_summary_insight(
self,
tenant_id: str,
all_results: List[Dict[str, Any]],
buy_now_count: int,
wait_count: int,
bulk_opportunity_count: int
) -> Optional[Dict[str, Any]]:
"""
Generate portfolio-level summary insight.
Args:
tenant_id: Tenant identifier
all_results: All ingredient forecast results
buy_now_count: Number of buy now recommendations
wait_count: Number of wait recommendations
bulk_opportunity_count: Number of bulk opportunities
Returns:
Summary insight or None
"""
if buy_now_count == 0 and bulk_opportunity_count == 0:
return None
# Calculate potential savings from bulk opportunities
total_potential_savings = 0
for result in all_results:
bulk_opp = result.get('bulk_opportunity', {})
if bulk_opp.get('has_bulk_opportunity'):
# Estimate savings (simplified)
savings_pct = bulk_opp.get('potential_savings_pct', 0)
total_potential_savings += savings_pct
avg_potential_savings = total_potential_savings / max(1, bulk_opportunity_count)
description_parts = []
if buy_now_count > 0:
description_parts.append(f'{buy_now_count} ingredients show price increases - purchase soon')
if bulk_opportunity_count > 0:
description_parts.append(f'{bulk_opportunity_count} ingredients have bulk buying opportunities (avg {avg_potential_savings:.1f}% savings)')
return {
'type': 'recommendation',
'priority': 'high' if buy_now_count > 2 else 'medium',
'category': 'procurement',
'title': f'Procurement Timing Opportunities: {buy_now_count + bulk_opportunity_count} Items',
'description': 'Price forecast analysis identified procurement timing opportunities. ' + '. '.join(description_parts) + '.',
'impact_type': 'cost_optimization',
'impact_value': avg_potential_savings if bulk_opportunity_count > 0 else buy_now_count,
'impact_unit': 'percentage' if bulk_opportunity_count > 0 else 'items',
'confidence': 75,
'metrics_json': {
'ingredients_analyzed': len(all_results),
'buy_now_count': buy_now_count,
'wait_count': wait_count,
'bulk_opportunity_count': bulk_opportunity_count,
'avg_potential_savings_pct': round(avg_potential_savings, 2)
},
'actionable': True,
'recommendation_actions': [
{
'label': 'Review Price Forecasts',
'action': 'review_price_forecasts',
'params': {'tenant_id': tenant_id}
},
{
'label': 'Create Optimized Orders',
'action': 'create_optimized_purchase_orders',
'params': {'tenant_id': tenant_id}
}
],
'source_service': 'procurement',
'source_model': 'price_forecaster'
}
async def get_price_forecast(
self,
ingredient_id: str
) -> Optional[Dict[str, Any]]:
"""
Get cached seasonal patterns for an ingredient.
Args:
ingredient_id: Ingredient identifier
Returns:
Seasonal patterns or None if not forecasted
"""
return self.forecaster.get_seasonal_patterns(ingredient_id)
async def get_volatility_assessment(
self,
ingredient_id: str
) -> Optional[Dict[str, Any]]:
"""
Get cached volatility assessment for an ingredient.
Args:
ingredient_id: Ingredient identifier
Returns:
Volatility assessment or None if not assessed
"""
return self.forecaster.get_volatility_score(ingredient_id)
async def close(self):
"""Close HTTP client connections."""
await self.ai_insights_client.close()