372 lines
13 KiB
Python
372 lines
13 KiB
Python
"""
|
|
Price Insights Orchestrator
|
|
Coordinates price forecasting and insight posting
|
|
"""
|
|
|
|
import pandas as pd
|
|
from typing import Dict, List, Any, Optional
|
|
import structlog
|
|
from datetime import datetime
|
|
from uuid import UUID
|
|
import sys
|
|
import os
|
|
|
|
# Add shared clients to path
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..'))
|
|
from shared.clients.ai_insights_client import AIInsightsClient
|
|
|
|
from app.ml.price_forecaster import PriceForecaster
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class PriceInsightsOrchestrator:
|
|
"""
|
|
Orchestrates price forecasting and insight generation workflow.
|
|
|
|
Workflow:
|
|
1. Forecast prices from historical data
|
|
2. Generate buy/wait/bulk recommendations
|
|
3. Post insights to AI Insights Service
|
|
4. Provide price forecasts for procurement planning
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ai_insights_base_url: str = "http://ai-insights-service:8000"
|
|
):
|
|
self.forecaster = PriceForecaster()
|
|
self.ai_insights_client = AIInsightsClient(ai_insights_base_url)
|
|
|
|
async def forecast_and_post_insights(
|
|
self,
|
|
tenant_id: str,
|
|
ingredient_id: str,
|
|
price_history: pd.DataFrame,
|
|
forecast_horizon_days: int = 30,
|
|
min_history_days: int = 180
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Complete workflow: Forecast prices and post insights.
|
|
|
|
Args:
|
|
tenant_id: Tenant identifier
|
|
ingredient_id: Ingredient identifier
|
|
price_history: Historical price data
|
|
forecast_horizon_days: Days to forecast ahead
|
|
min_history_days: Minimum days of history required
|
|
|
|
Returns:
|
|
Workflow results with forecast and posted insights
|
|
"""
|
|
logger.info(
|
|
"Starting price forecasting workflow",
|
|
tenant_id=tenant_id,
|
|
ingredient_id=ingredient_id,
|
|
history_days=len(price_history)
|
|
)
|
|
|
|
# Step 1: Forecast prices
|
|
forecast_results = await self.forecaster.forecast_price(
|
|
tenant_id=tenant_id,
|
|
ingredient_id=ingredient_id,
|
|
price_history=price_history,
|
|
forecast_horizon_days=forecast_horizon_days,
|
|
min_history_days=min_history_days
|
|
)
|
|
|
|
logger.info(
|
|
"Price forecasting complete",
|
|
ingredient_id=ingredient_id,
|
|
recommendation=forecast_results.get('recommendations', {}).get('action'),
|
|
insights_generated=len(forecast_results.get('insights', []))
|
|
)
|
|
|
|
# Step 2: Enrich insights with tenant_id and ingredient context
|
|
enriched_insights = self._enrich_insights(
|
|
forecast_results.get('insights', []),
|
|
tenant_id,
|
|
ingredient_id
|
|
)
|
|
|
|
# Step 3: Post insights to AI Insights Service
|
|
if enriched_insights:
|
|
post_results = await self.ai_insights_client.create_insights_bulk(
|
|
tenant_id=UUID(tenant_id),
|
|
insights=enriched_insights
|
|
)
|
|
|
|
logger.info(
|
|
"Price insights posted to AI Insights Service",
|
|
ingredient_id=ingredient_id,
|
|
total=post_results['total'],
|
|
successful=post_results['successful'],
|
|
failed=post_results['failed']
|
|
)
|
|
else:
|
|
post_results = {'total': 0, 'successful': 0, 'failed': 0}
|
|
logger.info("No insights to post for ingredient", ingredient_id=ingredient_id)
|
|
|
|
# Step 4: Return comprehensive results
|
|
return {
|
|
'tenant_id': tenant_id,
|
|
'ingredient_id': ingredient_id,
|
|
'forecasted_at': forecast_results['forecasted_at'],
|
|
'history_days': forecast_results['history_days'],
|
|
'forecast': forecast_results.get('forecast', {}),
|
|
'recommendation': forecast_results.get('recommendations', {}),
|
|
'bulk_opportunity': forecast_results.get('bulk_opportunities', {}),
|
|
'insights_generated': len(enriched_insights),
|
|
'insights_posted': post_results['successful'],
|
|
'insights_failed': post_results['failed'],
|
|
'created_insights': post_results.get('created_insights', [])
|
|
}
|
|
|
|
def _enrich_insights(
|
|
self,
|
|
insights: List[Dict[str, Any]],
|
|
tenant_id: str,
|
|
ingredient_id: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Enrich insights with required fields for AI Insights Service.
|
|
|
|
Args:
|
|
insights: Raw insights from forecaster
|
|
tenant_id: Tenant identifier
|
|
ingredient_id: Ingredient identifier
|
|
|
|
Returns:
|
|
Enriched insights ready for posting
|
|
"""
|
|
enriched = []
|
|
|
|
for insight in insights:
|
|
# Add required tenant_id
|
|
enriched_insight = insight.copy()
|
|
enriched_insight['tenant_id'] = tenant_id
|
|
|
|
# Add ingredient context to metrics
|
|
if 'metrics_json' not in enriched_insight:
|
|
enriched_insight['metrics_json'] = {}
|
|
|
|
enriched_insight['metrics_json']['ingredient_id'] = ingredient_id
|
|
|
|
# Add source metadata
|
|
enriched_insight['source_service'] = 'procurement'
|
|
enriched_insight['source_model'] = 'price_forecaster'
|
|
enriched_insight['detected_at'] = datetime.utcnow().isoformat()
|
|
|
|
enriched.append(enriched_insight)
|
|
|
|
return enriched
|
|
|
|
async def forecast_all_ingredients(
|
|
self,
|
|
tenant_id: str,
|
|
ingredients_data: Dict[str, pd.DataFrame],
|
|
forecast_horizon_days: int = 30,
|
|
min_history_days: int = 180
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Forecast prices for all ingredients for a tenant.
|
|
|
|
Args:
|
|
tenant_id: Tenant identifier
|
|
ingredients_data: Dict of {ingredient_id: price_history DataFrame}
|
|
forecast_horizon_days: Days to forecast
|
|
min_history_days: Minimum history required
|
|
|
|
Returns:
|
|
Comprehensive forecasting results
|
|
"""
|
|
logger.info(
|
|
"Forecasting prices for all ingredients",
|
|
tenant_id=tenant_id,
|
|
ingredients=len(ingredients_data)
|
|
)
|
|
|
|
all_results = []
|
|
total_insights_posted = 0
|
|
buy_now_count = 0
|
|
wait_count = 0
|
|
bulk_opportunity_count = 0
|
|
|
|
# Forecast each ingredient
|
|
for ingredient_id, price_history in ingredients_data.items():
|
|
try:
|
|
results = await self.forecast_and_post_insights(
|
|
tenant_id=tenant_id,
|
|
ingredient_id=ingredient_id,
|
|
price_history=price_history,
|
|
forecast_horizon_days=forecast_horizon_days,
|
|
min_history_days=min_history_days
|
|
)
|
|
|
|
all_results.append(results)
|
|
total_insights_posted += results['insights_posted']
|
|
|
|
# Count recommendations
|
|
action = results['recommendation'].get('action')
|
|
if action == 'buy_now':
|
|
buy_now_count += 1
|
|
elif action in ['wait', 'wait_for_dip']:
|
|
wait_count += 1
|
|
|
|
if results['bulk_opportunity'].get('has_bulk_opportunity'):
|
|
bulk_opportunity_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error forecasting ingredient",
|
|
ingredient_id=ingredient_id,
|
|
error=str(e)
|
|
)
|
|
|
|
# Generate summary insight
|
|
if buy_now_count > 0 or bulk_opportunity_count > 0:
|
|
summary_insight = self._generate_portfolio_summary_insight(
|
|
tenant_id, all_results, buy_now_count, wait_count, bulk_opportunity_count
|
|
)
|
|
|
|
if summary_insight:
|
|
enriched_summary = self._enrich_insights(
|
|
[summary_insight], tenant_id, 'all_ingredients'
|
|
)
|
|
|
|
post_results = await self.ai_insights_client.create_insights_bulk(
|
|
tenant_id=UUID(tenant_id),
|
|
insights=enriched_summary
|
|
)
|
|
|
|
total_insights_posted += post_results['successful']
|
|
|
|
logger.info(
|
|
"All ingredients forecasting complete",
|
|
tenant_id=tenant_id,
|
|
ingredients_forecasted=len(all_results),
|
|
total_insights_posted=total_insights_posted,
|
|
buy_now_recommendations=buy_now_count,
|
|
bulk_opportunities=bulk_opportunity_count
|
|
)
|
|
|
|
return {
|
|
'tenant_id': tenant_id,
|
|
'forecasted_at': datetime.utcnow().isoformat(),
|
|
'ingredients_forecasted': len(all_results),
|
|
'ingredient_results': all_results,
|
|
'total_insights_posted': total_insights_posted,
|
|
'buy_now_count': buy_now_count,
|
|
'wait_count': wait_count,
|
|
'bulk_opportunity_count': bulk_opportunity_count
|
|
}
|
|
|
|
def _generate_portfolio_summary_insight(
|
|
self,
|
|
tenant_id: str,
|
|
all_results: List[Dict[str, Any]],
|
|
buy_now_count: int,
|
|
wait_count: int,
|
|
bulk_opportunity_count: int
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Generate portfolio-level summary insight.
|
|
|
|
Args:
|
|
tenant_id: Tenant identifier
|
|
all_results: All ingredient forecast results
|
|
buy_now_count: Number of buy now recommendations
|
|
wait_count: Number of wait recommendations
|
|
bulk_opportunity_count: Number of bulk opportunities
|
|
|
|
Returns:
|
|
Summary insight or None
|
|
"""
|
|
if buy_now_count == 0 and bulk_opportunity_count == 0:
|
|
return None
|
|
|
|
# Calculate potential savings from bulk opportunities
|
|
total_potential_savings = 0
|
|
for result in all_results:
|
|
bulk_opp = result.get('bulk_opportunity', {})
|
|
if bulk_opp.get('has_bulk_opportunity'):
|
|
# Estimate savings (simplified)
|
|
savings_pct = bulk_opp.get('potential_savings_pct', 0)
|
|
total_potential_savings += savings_pct
|
|
|
|
avg_potential_savings = total_potential_savings / max(1, bulk_opportunity_count)
|
|
|
|
description_parts = []
|
|
if buy_now_count > 0:
|
|
description_parts.append(f'{buy_now_count} ingredients show price increases - purchase soon')
|
|
if bulk_opportunity_count > 0:
|
|
description_parts.append(f'{bulk_opportunity_count} ingredients have bulk buying opportunities (avg {avg_potential_savings:.1f}% savings)')
|
|
|
|
return {
|
|
'type': 'recommendation',
|
|
'priority': 'high' if buy_now_count > 2 else 'medium',
|
|
'category': 'procurement',
|
|
'title': f'Procurement Timing Opportunities: {buy_now_count + bulk_opportunity_count} Items',
|
|
'description': 'Price forecast analysis identified procurement timing opportunities. ' + '. '.join(description_parts) + '.',
|
|
'impact_type': 'cost_optimization',
|
|
'impact_value': avg_potential_savings if bulk_opportunity_count > 0 else buy_now_count,
|
|
'impact_unit': 'percentage' if bulk_opportunity_count > 0 else 'items',
|
|
'confidence': 75,
|
|
'metrics_json': {
|
|
'ingredients_analyzed': len(all_results),
|
|
'buy_now_count': buy_now_count,
|
|
'wait_count': wait_count,
|
|
'bulk_opportunity_count': bulk_opportunity_count,
|
|
'avg_potential_savings_pct': round(avg_potential_savings, 2)
|
|
},
|
|
'actionable': True,
|
|
'recommendation_actions': [
|
|
{
|
|
'label': 'Review Price Forecasts',
|
|
'action': 'review_price_forecasts',
|
|
'params': {'tenant_id': tenant_id}
|
|
},
|
|
{
|
|
'label': 'Create Optimized Orders',
|
|
'action': 'create_optimized_purchase_orders',
|
|
'params': {'tenant_id': tenant_id}
|
|
}
|
|
],
|
|
'source_service': 'procurement',
|
|
'source_model': 'price_forecaster'
|
|
}
|
|
|
|
async def get_price_forecast(
|
|
self,
|
|
ingredient_id: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get cached seasonal patterns for an ingredient.
|
|
|
|
Args:
|
|
ingredient_id: Ingredient identifier
|
|
|
|
Returns:
|
|
Seasonal patterns or None if not forecasted
|
|
"""
|
|
return self.forecaster.get_seasonal_patterns(ingredient_id)
|
|
|
|
async def get_volatility_assessment(
|
|
self,
|
|
ingredient_id: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get cached volatility assessment for an ingredient.
|
|
|
|
Args:
|
|
ingredient_id: Ingredient identifier
|
|
|
|
Returns:
|
|
Volatility assessment or None if not assessed
|
|
"""
|
|
return self.forecaster.get_volatility_score(ingredient_id)
|
|
|
|
async def close(self):
|
|
"""Close HTTP client connections."""
|
|
await self.ai_insights_client.close()
|