diff --git a/CI_CD_IMPLEMENTATION_PLAN.md b/CI_CD_IMPLEMENTATION_PLAN.md
new file mode 100644
index 00000000..2cf75201
--- /dev/null
+++ b/CI_CD_IMPLEMENTATION_PLAN.md
@@ -0,0 +1,1206 @@
+# Bakery-IA Production CI/CD Implementation Plan
+
+## Document Overview
+**Status**: Draft
+**Version**: 1.0
+**Date**: 2024-07-15
+**Author**: Mistral Vibe
+
+This document outlines the production-grade CI/CD architecture for Bakery-IA and provides a step-by-step implementation plan without requiring immediate code changes.
+
+## Table of Contents
+1. [Current State Analysis](#current-state-analysis)
+2. [Target Architecture](#target-architecture)
+3. [Implementation Strategy](#implementation-strategy)
+4. [Phase 1: Infrastructure Setup](#phase-1-infrastructure-setup)
+5. [Phase 2: CI/CD Pipeline Configuration](#phase-2-cicd-pipeline-configuration)
+6. [Phase 3: Monitoring and Observability](#phase-3-monitoring-and-observability)
+7. [Phase 4: Testing and Validation](#phase-4-testing-and-validation)
+8. [Phase 5: Rollout and Migration](#phase-5-rollback-and-migration)
+9. [Risk Assessment](#risk-assessment)
+10. [Success Metrics](#success-metrics)
+11. [Appendices](#appendices)
+
+---
+
+## Current State Analysis
+
+### Existing Infrastructure
+- **Microservices**: 19 services in `services/` directory
+- **Frontend**: React application in `frontend/`
+- **Gateway**: API gateway in `gateway/`
+- **Databases**: 22 PostgreSQL instances + Redis + RabbitMQ
+- **Storage**: MinIO for object storage
+- **Monitoring**: SigNoz already deployed
+- **Target Platform**: MicroK8s on Clouding.io VPS
+
+### Current Deployment Process
+- Manual builds using Tiltfile/Skaffold (local only)
+- Manual image pushes to local registry or Docker Hub
+- Manual kubectl apply commands
+- No automated testing gates
+- No rollback mechanism
+
+### Pain Points
+- "Works on my machine" issues
+- No audit trail of deployments
+- Time-consuming manual processes
+- Risk of human error
+- No automated testing in pipeline
+
+---
+
+## Target Architecture
+
+### High-Level Architecture Diagram
+
+```mermaid
+graph TD
+ A[Developer Workstation] -->|Push Code| B[Gitea Git Server]
+ B -->|Webhook| C[Tekton Pipelines]
+ C -->|Build/Test| D[Gitea Container Registry]
+ D -->|New Image| E[Flux CD]
+ E -->|Git Commit| B
+ E -->|kubectl apply| F[MicroK8s Cluster]
+ F -->|Metrics/Logs| G[SigNoz Monitoring]
+```
+
+### How CI/CD Tools Run in Kubernetes
+Yes, they are individual container images running as pods in your MicroK8s cluster, just like your application services.
+
+```mermaid
+graph TB
+ subgraph "MicroK8s Cluster (Your VPS)"
+ subgraph "Namespace: gitea"
+ A1[Pod: gitea
Image: gitea/gitea:latest]
+ A2[Pod: gitea-postgresql
Image: postgres:15]
+ A3[PVC: gitea-data]
+ end
+
+ subgraph "Namespace: tekton-pipelines"
+ B1[Pod: tekton-pipelines-controller
Image: gcr.io/tekton-releases/...]
+ B2[Pod: tekton-pipelines-webhook
Image: gcr.io/tekton-releases/...]
+ B3[Pod: tekton-triggers-controller
Image: gcr.io/tekton-releases/...]
+ end
+
+ subgraph "Namespace: flux-system"
+ C1[Pod: source-controller
Image: ghcr.io/fluxcd/...]
+ C2[Pod: kustomize-controller
Image: ghcr.io/fluxcd/...]
+ C3[Pod: helm-controller
Image: ghcr.io/fluxcd/...]
+ end
+
+ subgraph "Namespace: bakery-ia (YOUR APP)"
+ D1[19 services + 22 databases + Redis + RabbitMQ + MinIO]
+ end
+ end
+```
+
+### Component Breakdown
+
+#### 1. Gitea (Git Server + Registry)
+- **Purpose**: Replace GitHub dependency
+- **Namespace**: `gitea`
+- **Resources**: ~768MB RAM (512MB Gitea + 256MB PostgreSQL)
+- **Storage**: PVC for repositories and registry
+- **Access**: Internal DNS `gitea.bakery-ia.local`
+- **LeaderElectionService**: Gitea handles leader election internally for high availability scenarios
+
+#### 2. Tekton (CI Pipelines)
+- **Purpose**: Build, test, and push container images
+- **Namespace**: `tekton-pipelines`
+- **Resources**: ~650MB baseline + 512MB per build
+- **Key Features**:
+ - Path-based change detection
+ - Parallel builds for independent services
+ - Kaniko for in-cluster image building
+ - Integration with Gitea registry
+- **LeaderElectionService**: Tekton controllers use leader election to ensure high availability
+
+#### 3. Flux CD (GitOps Deployment)
+- **Purpose**: Automated deployments from Git
+- **Namespace**: `flux-system`
+- **Resources**: ~230MB baseline
+- **Key Features**:
+ - Pull-based deployments (no webhooks needed)
+ - Kustomize support for your existing overlays
+ - Image automation for rolling updates
+ - Drift detection and correction
+- **LeaderElectionService**: Flux controllers use leader election to ensure only one active controller
+
+#### 4. SigNoz (Monitoring)
+- **Purpose**: Observability for CI/CD and applications
+- **Integration Points**:
+ - Tekton pipeline metrics
+ - Flux reconciliation events
+ - Kubernetes resource metrics
+ - Application performance monitoring
+
+### Deployment Methods for Each Tool
+
+#### 1. Flux (Easiest - Built into MicroK8s)
+
+```bash
+# One command - MicroK8s has it built-in
+microk8s enable fluxcd
+
+# This creates:
+# - Namespace: flux-system
+# - Deployments: source-controller, kustomize-controller, helm-controller, notification-controller
+# - CRDs: GitRepository, Kustomization, HelmRelease, etc.
+```
+
+**Images pulled:**
+- `ghcr.io/fluxcd/source-controller:v1.x.x`
+- `ghcr.io/fluxcd/kustomize-controller:v1.x.x`
+- `ghcr.io/fluxcd/helm-controller:v0.x.x`
+- `ghcr.io/fluxcd/notification-controller:v1.x.x`
+
+#### 2. Tekton (kubectl apply or Helm)
+
+```bash
+# Option A: Direct apply (official releases)
+kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml
+kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml
+kubectl apply -f https://storage.googleapis.com/tekton-releases/dashboard/latest/release.yaml
+
+# Option B: Helm chart
+helm repo add tekton https://tekton.dev/charts
+helm install tekton-pipelines tekton/tekton-pipelines -n tekton-pipelines --create-namespace
+```
+
+**Images pulled:**
+- `gcr.io/tekton-releases/github.com/tektoncd/pipeline/cmd/controller:v0.x.x`
+- `gcr.io/tekton-releases/github.com/tektoncd/pipeline/cmd/webhook:v0.x.x`
+- `gcr.io/tekton-releases/github.com/tektoncd/triggers/cmd/controller:v0.x.x`
+- `gcr.io/tekton-releases/github.com/tektoncd/dashboard/cmd/dashboard:v0.x.x`
+
+#### 3. Gitea (Helm chart)
+
+```bash
+# Add Helm repo
+helm repo add gitea https://dl.gitea.io/charts
+
+# Install with custom values
+helm install gitea gitea/gitea \
+ -n gitea --create-namespace \
+ -f gitea-values.yaml
+```
+
+**Images pulled:**
+- `gitea/gitea:1.x.x`
+- `postgres:15-alpine` (or bundled)
+
+---
+
+## Complete Deployment Architecture
+
+```mermaid
+graph TB
+ subgraph "Your Git Repository
(Initially in GitHub, then Gitea)"
+ A[bakery-ia/
├── services/
├── frontend/
├── gateway/
├── infrastructure/
│ ├── kubernetes/
│ │ ├── base/
│ │ └── overlays/
│ │ ├── dev/
│ │ └── prod/
│ └── ci-cd/
│ ├── gitea/
│ ├── tekton/
│ └── flux/
└── tekton/
└── pipeline.yaml]
+ end
+
+ A --> B[Gitea
Self-hosted Git
Stores code
Triggers webhook]
+
+ B --> C[Tekton
EventListener
TriggerTemplate
PipelineRun]
+
+ C --> D[Pipeline Steps
├── clone
├── detect changes
├── test
├── build
└── push]
+
+ D --> E[Gitea Registry
gitea:5000/bakery/
auth-service:abc123]
+
+ E --> F[Flux
source-controller
kustomize-controller
kubectl apply]
+
+ F --> G[Your Application
bakery-ia namespace
Updated services]
+```
+
+### Guiding Principles
+1. **No Code Changes Required**: Use existing codebase as-is
+2. **Incremental Rollout**: Phase-based implementation
+3. **Zero Downtime**: Parallel run with existing manual process
+4. **Observability First**: Monitor before automating
+5. **Security by Design**: Secrets management from day one
+
+### Implementation Phases
+
+```mermaid
+gantt
+ title CI/CD Implementation Timeline
+ dateFormat YYYY-MM-DD
+ section Phase 1: Infrastructure
+ Infrastructure Setup :a1, 2024-07-15, 7d
+ section Phase 2: CI/CD Config
+ Pipeline Configuration :a2, 2024-07-22, 10d
+ section Phase 3: Monitoring
+ SigNoz Integration :a3, 2024-08-01, 5d
+ section Phase 4: Testing
+ Validation Testing :a4, 2024-08-06, 7d
+ section Phase 5: Rollout
+ Production Migration :a5, 2024-08-13, 5d
+```
+
+## Step-by-Step: How to Deploy CI/CD to Production
+
+### Phase 1: Bootstrap (One-time setup on VPS)
+
+```bash
+# SSH to your VPS
+ssh user@your-clouding-vps
+
+# 1. Enable Flux (built into MicroK8s)
+microk8s enable fluxcd
+
+# 2. Install Tekton
+microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml
+microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml
+
+# 3. Install Gitea via Helm
+microk8s helm repo add gitea https://dl.gitea.io/charts
+microk8s helm install gitea gitea/gitea -n gitea --create-namespace -f gitea-values.yaml
+
+# 4. Verify all running
+microk8s kubectl get pods -A | grep -E "gitea|tekton|flux"
+```
+
+After this, you have:
+
+```
+NAMESPACE NAME READY STATUS
+gitea gitea-0 1/1 Running
+gitea gitea-postgresql-0 1/1 Running
+tekton-pipelines tekton-pipelines-controller-xxx 1/1 Running
+tekton-pipelines tekton-pipelines-webhook-xxx 1/1 Running
+tekton-pipelines tekton-triggers-controller-xxx 1/1 Running
+flux-system source-controller-xxx 1/1 Running
+flux-system kustomize-controller-xxx 1/1 Running
+flux-system helm-controller-xxx 1/1 Running
+```
+
+### Phase 2: Configure Flux to Watch Your Repo
+
+```yaml
+# infrastructure/ci-cd/flux/gitrepository.yaml
+apiVersion: source.toolkit.fluxcd.io/v1
+kind: GitRepository
+metadata:
+ name: bakery-ia
+ namespace: flux-system
+spec:
+ interval: 1m
+ url: https://gitea.bakery-ia.local/bakery/bakery-ia.git
+ ref:
+ branch: main
+ secretRef:
+ name: gitea-credentials # Git credentials
+---
+# infrastructure/ci-cd/flux/kustomization.yaml
+apiVersion: kustomize.toolkit.fluxcd.io/v1
+kind: Kustomization
+metadata:
+ name: bakery-ia-prod
+ namespace: flux-system
+spec:
+ interval: 5m
+ path: ./infrastructure/kubernetes/overlays/prod
+ prune: true
+ sourceRef:
+ kind: GitRepository
+ name: bakery-ia
+ targetNamespace: bakery-ia
+```
+
+### Phase 3: Configure Tekton Pipeline
+
+```yaml
+# tekton/pipeline.yaml
+apiVersion: tekton.dev/v1beta1
+kind: Pipeline
+metadata:
+ name: bakery-ia-ci
+ namespace: tekton-pipelines
+spec:
+ params:
+ - name: git-url
+ - name: git-revision
+ - name: changed-services
+ type: array
+
+ workspaces:
+ - name: source
+ - name: docker-credentials
+
+ tasks:
+ - name: clone
+ taskRef:
+ name: git-clone
+ workspaces:
+ - name: output
+ workspace: source
+ params:
+ - name: url
+ value: $(params.git-url)
+ - name: revision
+ value: $(params.git-revision)
+
+ - name: detect-changes
+ runAfter: [clone]
+ taskRef:
+ name: detect-changed-services
+ workspaces:
+ - name: source
+ workspace: source
+
+ - name: build-and-push
+ runAfter: [detect-changes]
+ taskRef:
+ name: kaniko-build
+ params:
+ - name: services
+ value: $(tasks.detect-changes.results.changed-services)
+ workspaces:
+ - name: source
+ workspace: source
+ - name: docker-credentials
+ workspace: docker-credentials
+```
+
+## Visual: Complete Production Flow
+
+```mermaid
+graph LR
+ A[Developer pushes code] --> B[Gitea
Self-hosted Git
• Receives push
• Stores code
• Triggers webhook]
+
+ B -->|webhook POST to tekton-triggers| C[Tekton
EventListener
TriggerTemplate
PipelineRun]
+
+ C --> D[Pipeline Steps
Each step = container in pod:
├── clone
├── detect changes
├── test (pytest)
├── build (kaniko)
└── push (registry)]
+
+ D --> E[Only changed services]
+
+ D --> F[Gitea Registry
gitea:5000/bakery/
auth-service:abc123]
+
+ F -->|Final step: Update image tag in Git
commits new tag to infrastructure/kubernetes/overlays/prod| G[Git commit triggers Flux]
+
+ G --> H[Flux
source-controller
kustomize-controller
kubectl apply
• Detects new image tag in Git
• Renders Kustomize overlay
• Applies to bakery-ia namespace
• Rolling update of changed services]
+
+ H --> I[Your Application
Namespace: bakery-ia
├── auth-service:abc123 ←NEW
├── tenant-svc:def456
└── training-svc:ghi789
Only auth-service was updated (others unchanged)]
+```
+
+## Where Images Come From
+
+| Component | Image Source | Notes |
+|-----------|--------------|-------|
+| Flux | ghcr.io/fluxcd/* | Pulled once, cached locally |
+| Tekton | gcr.io/tekton-releases/* | Pulled once, cached locally |
+| Gitea | gitea/gitea (Docker Hub) | Pulled once, cached locally |
+| Your Services | gitea.local:5000/bakery/* | Built by Tekton, stored in Gitea registry |
+| Build Tools | gcr.io/kaniko-project/executor | Used during builds only |
+
+## Summary: What Lives Where
+
+```mermaid
+graph TB
+ subgraph "MicroK8s Cluster"
+ subgraph "Namespace: gitea (CI/CD Infrastructure)
~768MB total"
+ A1[gitea pod ~512MB RAM]
+ A2[postgresql pod ~256MB RAM]
+ end
+
+ subgraph "Namespace: tekton-pipelines (CI/CD Infrastructure)
~650MB baseline"
+ B1[pipelines-controller ~200MB RAM]
+ B2[pipelines-webhook ~100MB RAM]
+ B3[triggers-controller ~150MB RAM]
+ B4[triggers-webhook ~100MB RAM]
+ end
+
+ subgraph "Namespace: flux-system (CI/CD Infrastructure)
~230MB baseline"
+ C1[source-controller ~50MB RAM]
+ C2[kustomize-controller ~50MB RAM]
+ C3[helm-controller ~50MB RAM]
+ C4[notification-controller ~30MB RAM]
+ end
+
+ subgraph "Namespace: bakery-ia (YOUR APPLICATION)"
+ D1[19 microservices]
+ D2[22 PostgreSQL databases]
+ D3[Redis]
+ D4[RabbitMQ]
+ D5[MinIO]
+ end
+ end
+
+ note1["CI/CD Total: ~1.5GB baseline"]
+ note2["During builds: +512MB per concurrent build (Tekton spawns pods)"]
+```
+
+### Key Points
+- Everything runs as pods - Gitea, Tekton, Flux are all containerized
+- Pulled from public registries once - then cached on your VPS
+- Your app images stay local - built by Tekton, stored in Gitea registry
+- No external dependencies after setup - fully self-contained
+- Flux pulls from Git - no incoming webhooks needed for deployments
+
+---
+
+## Phase 1: Infrastructure Setup
+
+### Objective
+Deploy CI/CD infrastructure components without affecting existing applications.
+
+### Step-by-Step Implementation
+
+#### Step 1: Prepare MicroK8s Cluster
+```bash
+# SSH to VPS
+ssh admin@bakery-ia-vps
+
+# Verify MicroK8s status
+microk8s status
+
+# Enable required addons
+microk8s enable dns storage ingress fluxcd
+
+# Verify storage class
+microk8s kubectl get storageclass
+```
+
+#### Step 2: Deploy Gitea
+
+**Create Gitea values file** (`infrastructure/ci-cd/gitea/values.yaml`):
+```yaml
+service:
+ type: ClusterIP
+ httpPort: 3000
+ sshPort: 2222
+
+persistence:
+ enabled: true
+ size: 50Gi
+ storageClass: "microk8s-hostpath"
+
+gitea:
+ config:
+ server:
+ DOMAIN: gitea.bakery-ia.local
+ SSH_DOMAIN: gitea.bakery-ia.local
+ ROOT_URL: http://gitea.bakery-ia.local
+ repository:
+ ENABLE_PUSH_CREATE_USER: true
+ ENABLE_PUSH_CREATE_ORG: true
+ registry:
+ ENABLED: true
+
+postgresql:
+ enabled: true
+ persistence:
+ size: 20Gi
+```
+
+**Deploy Gitea**:
+```bash
+# Add Helm repo
+microk8s helm repo add gitea https://dl.gitea.io/charts
+
+# Create namespace
+microk8s kubectl create namespace gitea
+
+# Install Gitea
+microk8s helm install gitea gitea/gitea \
+ -n gitea \
+ -f infrastructure/ci-cd/gitea/values.yaml
+```
+
+**Verify Deployment**:
+```bash
+# Check pods
+microk8s kubectl get pods -n gitea
+
+# Get admin password
+microk8s kubectl get secret -n gitea gitea-admin-secret -o jsonpath='{.data.password}' | base64 -d
+```
+
+#### Step 3: Configure Ingress for Gitea
+
+**Create Ingress Resource** (`infrastructure/ci-cd/gitea/ingress.yaml`):
+```yaml
+apiVersion: networking.k8s.io/v1
+kind: Ingress
+metadata:
+ name: gitea-ingress
+ namespace: gitea
+ annotations:
+ nginx.ingress.kubernetes.io/rewrite-target: /
+spec:
+ rules:
+ - host: gitea.bakery-ia.local
+ http:
+ paths:
+ - path: /
+ pathType: Prefix
+ backend:
+ service:
+ name: gitea-http
+ port:
+ number: 3000
+```
+
+**Apply Ingress**:
+```bash
+microk8s kubectl apply -f infrastructure/ci-cd/gitea/ingress.yaml
+```
+
+#### Step 4: Migrate Repository from GitHub
+
+**Manual Migration Steps**:
+1. Create new repository in Gitea UI
+2. Use git mirror to push existing repo:
+```bash
+# Clone bare repo from GitHub
+git clone --bare git@github.com:your-org/bakery-ia.git
+
+# Push to Gitea
+cd bakery-ia.git
+git push --mirror http://admin:PASSWORD@gitea.bakery-ia.local/your-org/bakery-ia.git
+```
+
+#### Step 5: Deploy Tekton
+
+**Install Tekton Pipelines**:
+```bash
+# Create namespace
+microk8s kubectl create namespace tekton-pipelines
+
+# Install Tekton Pipelines
+microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml
+
+# Install Tekton Triggers
+microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml
+
+# Install Tekton Dashboard (optional)
+microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/dashboard/latest/release.yaml
+```
+
+**Verify Installation**:
+```bash
+microk8s kubectl get pods -n tekton-pipelines
+```
+
+#### Step 6: Configure Tekton for Gitea Integration
+
+**Create Gitea Webhook Secret**:
+```bash
+# Generate webhook secret
+WEBHOOK_SECRET=$(openssl rand -hex 20)
+
+# Create secret
+microk8s kubectl create secret generic gitea-webhook-secret \
+ -n tekton-pipelines \
+ --from-literal=secretToken=$WEBHOOK_SECRET
+```
+
+**Configure Gitea Webhook**:
+1. Go to Gitea repository settings
+2. Add webhook:
+ - URL: `http://tekton-triggers.tekton-pipelines.svc.cluster.local:8080`
+ - Secret: Use the generated `WEBHOOK_SECRET`
+ - Trigger: Push events
+
+#### Step 7: Verify Flux Installation
+
+**Check Flux Components**:
+```bash
+microk8s kubectl get pods -n flux-system
+
+# Verify CRDs
+microk8s kubectl get crd | grep flux
+```
+
+---
+
+## Phase 2: CI/CD Pipeline Configuration
+
+### Objective
+Configure pipelines to build, test, and deploy services automatically.
+
+### Step-by-Step Implementation
+
+#### Step 1: Create Tekton Tasks
+
+**Git Clone Task** (`infrastructure/ci-cd/tekton/tasks/git-clone.yaml`):
+```yaml
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: git-clone
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: output
+ params:
+ - name: url
+ type: string
+ - name: revision
+ type: string
+ default: "main"
+ steps:
+ - name: clone
+ image: alpine/git
+ script: |
+ git clone $(params.url) $(workspaces.output.path)
+ cd $(workspaces.output.path)
+ git checkout $(params.revision)
+```
+
+**Detect Changed Services Task** (`infrastructure/ci-cd/tekton/tasks/detect-changes.yaml`):
+```yaml
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: detect-changed-services
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: source
+ results:
+ - name: changed-services
+ description: List of changed services
+ steps:
+ - name: detect
+ image: alpine/git
+ script: |
+ cd $(workspaces.source.path)
+ # Get list of changed files
+ CHANGED_FILES=$(git diff --name-only HEAD~1 HEAD)
+
+ # Map files to services
+ CHANGED_SERVICES=()
+ for file in $CHANGED_FILES; do
+ if [[ $file == services/* ]]; then
+ SERVICE=$(echo $file | cut -d'/' -f2)
+ CHANGED_SERVICES+=($SERVICE)
+ fi
+ done
+
+ # Remove duplicates and output
+ echo $(printf "%s," "${CHANGED_SERVICES[@]}" | sed 's/,$//') | tee $(results.changed-services.path)
+```
+
+**Kaniko Build Task** (`infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml`):
+```yaml
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: kaniko-build
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: source
+ - name: docker-credentials
+ params:
+ - name: services
+ type: string
+ - name: registry
+ type: string
+ default: "gitea.bakery-ia.local:5000"
+ steps:
+ - name: build-and-push
+ image: gcr.io/kaniko-project/executor:v1.9.0
+ args:
+ - --dockerfile=$(workspaces.source.path)/services/$(params.services)/Dockerfile
+ - --context=$(workspaces.source.path)
+ - --destination=$(params.registry)/bakery/$(params.services):$(params.git-revision)
+ volumeMounts:
+ - name: docker-config
+ mountPath: /kaniko/.docker
+```
+
+#### Step 2: Create Tekton Pipeline
+
+**Main CI Pipeline** (`infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml`):
+```yaml
+apiVersion: tekton.dev/v1beta1
+kind: Pipeline
+metadata:
+ name: bakery-ia-ci
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: shared-workspace
+ - name: docker-credentials
+ params:
+ - name: git-url
+ type: string
+ - name: git-revision
+ type: string
+ tasks:
+ - name: fetch-source
+ taskRef:
+ name: git-clone
+ workspaces:
+ - name: output
+ workspace: shared-workspace
+ params:
+ - name: url
+ value: $(params.git-url)
+ - name: revision
+ value: $(params.git-revision)
+
+ - name: detect-changes
+ runAfter: [fetch-source]
+ taskRef:
+ name: detect-changed-services
+ workspaces:
+ - name: source
+ workspace: shared-workspace
+
+ - name: build-and-push
+ runAfter: [detect-changes]
+ taskRef:
+ name: kaniko-build
+ workspaces:
+ - name: source
+ workspace: shared-workspace
+ - name: docker-credentials
+ workspace: docker-credentials
+ params:
+ - name: services
+ value: $(tasks.detect-changes.results.changed-services)
+ - name: registry
+ value: "gitea.bakery-ia.local:5000"
+```
+
+#### Step 3: Create Tekton Trigger
+
+**Trigger Template** (`infrastructure/ci-cd/tekton/triggers/trigger-template.yaml`):
+```yaml
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: TriggerTemplate
+metadata:
+ name: bakery-ia-trigger-template
+ namespace: tekton-pipelines
+spec:
+ params:
+ - name: git-repo-url
+ - name: git-revision
+ resourcetemplates:
+ - apiVersion: tekton.dev/v1beta1
+ kind: PipelineRun
+ metadata:
+ generateName: bakery-ia-ci-run-
+ spec:
+ pipelineRef:
+ name: bakery-ia-ci
+ workspaces:
+ - name: shared-workspace
+ volumeClaimTemplate:
+ spec:
+ accessModes: ["ReadWriteOnce"]
+ resources:
+ requests:
+ storage: 1Gi
+ - name: docker-credentials
+ secret:
+ secretName: gitea-registry-credentials
+ params:
+ - name: git-url
+ value: $(params.git-repo-url)
+ - name: git-revision
+ value: $(params.git-revision)
+```
+
+**Trigger Binding** (`infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml`):
+```yaml
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: TriggerBinding
+metadata:
+ name: bakery-ia-trigger-binding
+ namespace: tekton-pipelines
+spec:
+ params:
+ - name: git-repo-url
+ value: $(body.repository.clone_url)
+ - name: git-revision
+ value: $(body.head_commit.id)
+```
+
+**Event Listener** (`infrastructure/ci-cd/tekton/triggers/event-listener.yaml`):
+```yaml
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: EventListener
+metadata:
+ name: bakery-ia-listener
+ namespace: tekton-pipelines
+spec:
+ serviceAccountName: tekton-triggers-sa
+ triggers:
+ - name: bakery-ia-trigger
+ bindings:
+ - ref: bakery-ia-trigger-binding
+ template:
+ ref: bakery-ia-trigger-template
+```
+
+#### Step 4: Configure Flux for GitOps
+
+**Git Repository Source** (`infrastructure/ci-cd/flux/git-repository.yaml`):
+```yaml
+apiVersion: source.toolkit.fluxcd.io/v1
+kind: GitRepository
+metadata:
+ name: bakery-ia
+ namespace: flux-system
+spec:
+ interval: 1m
+ url: http://gitea.bakery-ia.local/your-org/bakery-ia.git
+ ref:
+ branch: main
+ secretRef:
+ name: gitea-credentials
+```
+
+**Kustomization for Production** (`infrastructure/ci-cd/flux/kustomization.yaml`):
+```yaml
+apiVersion: kustomize.toolkit.fluxcd.io/v1
+kind: Kustomization
+metadata:
+ name: bakery-ia-prod
+ namespace: flux-system
+spec:
+ interval: 5m
+ path: ./infrastructure/kubernetes/overlays/prod
+ prune: true
+ sourceRef:
+ kind: GitRepository
+ name: bakery-ia
+ targetNamespace: bakery-ia
+```
+
+#### Step 5: Apply All Configurations
+
+```bash
+# Apply Tekton tasks
+microk8s kubectl apply -f infrastructure/ci-cd/tekton/tasks/
+
+# Apply Tekton pipeline
+microk8s kubectl apply -f infrastructure/ci-cd/tekton/pipelines/
+
+# Apply Tekton triggers
+microk8s kubectl apply -f infrastructure/ci-cd/tekton/triggers/
+
+# Apply Flux configurations
+microk8s kubectl apply -f infrastructure/ci-cd/flux/
+```
+
+---
+
+## Phase 3: Monitoring and Observability
+
+### Objective
+Integrate SigNoz with CI/CD pipelines for comprehensive monitoring.
+
+### Step-by-Step Implementation
+
+#### Step 1: Configure OpenTelemetry for Tekton
+
+**Install OpenTelemetry Collector** (`infrastructure/ci-cd/monitoring/otel-collector.yaml`):
+```yaml
+apiVersion: opentelemetry.io/v1alpha1
+kind: OpenTelemetryCollector
+metadata:
+ name: tekton-otel
+ namespace: tekton-pipelines
+spec:
+ config: |
+ receivers:
+ otlp:
+ protocols:
+ grpc:
+ http:
+ processors:
+ batch:
+ exporters:
+ otlp:
+ endpoint: "signoz-otel-collector.monitoring.svc.cluster.local:4317"
+ tls:
+ insecure: true
+ service:
+ pipelines:
+ traces:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [otlp]
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [otlp]
+```
+
+**Apply Configuration**:
+```bash
+microk8s kubectl apply -f infrastructure/ci-cd/monitoring/otel-collector.yaml
+```
+
+#### Step 2: Instrument Tekton Pipelines
+
+**Update Pipeline with Tracing** (add to `ci-pipeline.yaml`):
+```yaml
+spec:
+ tasks:
+ - name: fetch-source
+ taskRef:
+ name: git-clone
+ # Add OpenTelemetry sidecar
+ sidecars:
+ - name: otel-collector
+ image: otel/opentelemetry-collector-contrib:0.70.0
+ args: ["--config=/etc/otel-collector-config.yaml"]
+ volumeMounts:
+ - name: otel-config
+ mountPath: /etc/otel-collector-config.yaml
+ subPath: otel-collector-config.yaml
+ volumes:
+ - name: otel-config
+ configMap:
+ name: otel-collector-config
+```
+
+#### Step 3: Configure SigNoz Dashboards
+
+**Create CI/CD Dashboard**:
+1. Log in to SigNoz UI
+2. Create new dashboard: "CI/CD Pipeline Metrics"
+3. Add panels:
+ - Pipeline execution time
+ - Success/failure rates
+ - Build duration by service
+ - Resource usage during builds
+
+**Create Deployment Dashboard**:
+1. Create dashboard: "GitOps Deployment Metrics"
+2. Add panels:
+ - Flux reconciliation events
+ - Deployment frequency
+ - Rollback events
+ - Resource changes
+
+---
+
+## Phase 4: Testing and Validation
+
+### Objective
+Validate CI/CD pipeline functionality without affecting production.
+
+### Test Plan
+
+#### Test 1: Gitea Functionality
+- **Test**: Push code to Gitea repository
+- **Expected**: Code appears in Gitea UI, webhook triggers
+- **Validation**:
+ ```bash
+ # Push test commit
+ cd bakery-ia
+echo "test" > test-file.txt
+git add test-file.txt
+git commit -m "Test CI/CD"
+git push origin main
+ ```
+
+#### Test 2: Tekton Pipeline Trigger
+- **Test**: Verify pipeline triggers on push
+- **Expected**: PipelineRun created in tekton-pipelines namespace
+- **Validation**:
+ ```bash
+ # Check PipelineRuns
+ microk8s kubectl get pipelineruns -n tekton-pipelines
+ ```
+
+#### Test 3: Change Detection
+- **Test**: Modify single service and verify only that service builds
+- **Expected**: Only changed service is built and pushed
+- **Validation**:
+ ```bash
+ # Check build logs
+ microk8s kubectl logs -n tekton-pipelines -c build-and-push
+ ```
+
+#### Test 4: Image Registry
+- **Test**: Verify images pushed to Gitea registry
+- **Expected**: New image appears in registry
+- **Validation**:
+ ```bash
+ # List images in registry
+ curl -X GET http://gitea.bakery-ia.local/api/v2/repositories/bakery/auth-service/tags
+ ```
+
+#### Test 5: Flux Deployment
+- **Test**: Verify Flux detects and applies changes
+- **Expected**: New deployment in bakery-ia namespace
+- **Validation**:
+ ```bash
+ # Check Flux reconciliation
+ microk8s kubectl get kustomizations -n flux-system
+
+ # Check deployments
+ microk8s kubectl get deployments -n bakery-ia
+ ```
+
+#### Test 6: Rollback
+- **Test**: Verify rollback capability
+- **Expected**: Previous version redeployed successfully
+- **Validation**:
+ ```bash
+ # Rollback via Git
+ git revert
+git push origin main
+
+ # Verify rollback
+ microk8s kubectl get pods -n bakery-ia -w
+ ```
+
+---
+
+## Phase 5: Rollout and Migration
+
+### Objective
+Gradually migrate from manual to automated CI/CD.
+
+### Migration Strategy
+
+#### Step 1: Parallel Run
+- Run automated CI/CD alongside manual process
+- Compare results for 1 week
+- Monitor with SigNoz
+
+#### Step 2: Canary Deployment
+- Start with non-critical services:
+ - auth-service
+ - tenant-service
+ - training-service
+- Monitor stability and performance
+
+#### Step 3: Full Migration
+- Migrate all services to automated pipeline
+- Disable manual deployment scripts
+- Update documentation
+
+#### Step 4: Cleanup
+- Remove old Tiltfile/Skaffold configurations
+- Archive manual deployment scripts
+- Update team documentation
+
+---
+
+## Risk Assessment
+
+### Identified Risks
+
+| Risk | Likelihood | Impact | Mitigation Strategy |
+|------|------------|--------|---------------------|
+| Pipeline fails to detect changes | Medium | High | Manual override procedure, detailed logging |
+| Resource exhaustion during builds | High | Medium | Resource quotas, build queue limits |
+| Registry storage fills up | Medium | Medium | Automated cleanup policy, monitoring alerts |
+| Flux applies incorrect configuration | Low | High | Manual approval for first run, rollback testing |
+| Network issues between components | Medium | High | Health checks, retry logic |
+
+### Mitigation Plan
+
+1. **Resource Management**:
+ - Set resource quotas for CI/CD namespaces
+ - Limit concurrent builds to 2
+ - Monitor with SigNoz alerts
+
+2. **Backup Strategy**:
+ - Regular backups of Gitea (repos + registry)
+ - Backup Flux configurations
+ - Database backups for all services
+
+3. **Rollback Plan**:
+ - Document manual rollback procedures
+ - Test rollback for each service
+ - Maintain backup of manual deployment scripts
+
+4. **Monitoring Alerts**:
+ - Pipeline failure alerts
+ - Resource threshold alerts
+ - Deployment failure alerts
+
+---
+
+## Success Metrics
+
+### Quantitative Metrics
+1. **Deployment Frequency**: Increase from manual to automated deployments
+2. **Lead Time for Changes**: Reduce from hours to minutes
+3. **Change Failure Rate**: Maintain or reduce current rate
+4. **Mean Time to Recovery**: Improve with automated rollbacks
+5. **Resource Utilization**: Monitor CI/CD overhead (< 2GB baseline)
+
+### Qualitative Metrics
+1. **Developer Satisfaction**: Survey team on CI/CD experience
+2. **Deployment Confidence**: Reduced "works on my machine" issues
+3. **Auditability**: Full traceability of all deployments
+4. **Reliability**: Consistent deployment outcomes
+
+---
+
+## Appendices
+
+### Appendix A: Required Tools and Versions
+- MicroK8s: v1.27+
+- Gitea: v1.19+
+- Tekton Pipelines: v0.47+
+- Flux CD: v2.0+
+- SigNoz: v0.20+
+- Kaniko: v1.9+
+
+### Appendix B: Network Requirements
+- Internal DNS: `gitea.bakery-ia.local`
+- Ingress: Configured for Gitea and SigNoz
+- Network Policies: Allow communication between namespaces
+
+### Appendix C: Backup Procedures
+```bash
+# Backup Gitea
+microk8s kubectl exec -n gitea gitea-0 -- gitea dump -c /data/gitea/conf/app.ini
+
+# Backup Flux configurations
+microk8s kubectl get all -n flux-system -o yaml > flux-backup.yaml
+
+# Backup Tekton configurations
+microk8s kubectl get all -n tekton-pipelines -o yaml > tekton-backup.yaml
+```
+
+### Appendix D: Troubleshooting Guide
+
+**Issue: Pipeline not triggering**
+- Check Gitea webhook logs
+- Verify EventListener pods
+- Check TriggerBinding configuration
+
+**Issue: Build fails**
+- Check Kaniko logs
+- Verify Dockerfile paths
+- Ensure registry credentials are correct
+
+**Issue: Flux not applying changes**
+- Check GitRepository status
+- Verify Kustomization reconciliation
+- Check Flux logs
+
+---
+
+## Conclusion
+
+This implementation plan provides a clear path to transition from manual deployments to a fully automated, self-hosted CI/CD system. By following the phased approach, we minimize risk while maximizing the benefits of automation, observability, and reliability.
+
+### Next Steps
+1. Review and approve this plan
+2. Schedule Phase 1 implementation
+3. Assign team members to specific tasks
+4. Begin infrastructure setup
+
+**Approval**:
+- [ ] Team Lead
+- [ ] DevOps Engineer
+- [ ] Security Review
+
+**Implementation Start Date**: _______________
+**Target Completion Date**: _______________
diff --git a/INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md b/INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md
new file mode 100644
index 00000000..ef36620d
--- /dev/null
+++ b/INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md
@@ -0,0 +1,413 @@
+# Infrastructure Reorganization Proposal for Bakery-IA
+
+## Executive Summary
+
+This document presents a comprehensive analysis of the current infrastructure organization and proposes a restructured layout that improves maintainability, scalability, and operational efficiency. The proposal is based on a detailed examination of the existing 177 files across 31 directories in the infrastructure folder.
+
+## Current Infrastructure Analysis
+
+### Current Structure Overview
+
+```
+infrastructure/
+├── ci-cd/ # 18 files - CI/CD pipeline components
+├── helm/ # 8 files - Helm charts and scripts
+├── kubernetes/ # 103 files - Kubernetes manifests and configs
+├── signoz/ # 11 files - Monitoring dashboards and scripts
+└── tls/ # 37 files - TLS certificates and generation scripts
+```
+
+### Key Findings
+
+1. **Kubernetes Base Components (103 files)**: The most complex area with:
+ - 20+ service deployments across 15+ microservices
+ - 20+ database configurations (PostgreSQL, RabbitMQ, MinIO)
+ - 19 migration jobs for different services
+ - Infrastructure components (gateway, monitoring, etc.)
+
+2. **CI/CD Pipeline (18 files)**:
+ - Tekton tasks and pipelines for GitOps workflow
+ - Flux CD configuration for continuous delivery
+ - Gitea configuration for Git repository management
+
+3. **Monitoring (11 files)**:
+ - SigNoz dashboards for comprehensive observability
+ - Import scripts for dashboard management
+
+4. **TLS Certificates (37 files)**:
+ - CA certificates and generation scripts
+ - Service-specific certificates (PostgreSQL, Redis, MinIO)
+ - Certificate signing requests and configurations
+
+### Strengths of Current Organization
+
+1. **Logical Grouping**: Components are generally well-grouped by function
+2. **Base/Overlay Pattern**: Kubernetes uses proper base/overlay structure
+3. **Comprehensive Monitoring**: SigNoz dashboards cover all major aspects
+4. **Security Focus**: Dedicated TLS certificate management
+
+### Challenges Identified
+
+1. **Complexity in Kubernetes Base**: 103 files make navigation difficult
+2. **Mixed Component Types**: Services, databases, and infrastructure mixed together
+3. **Limited Environment Separation**: Only dev/prod overlays, no staging
+4. **Script Scattering**: Automation scripts spread across directories
+5. **Documentation Gaps**: Some components lack clear documentation
+
+## Proposed Infrastructure Organization
+
+### High-Level Structure
+
+```
+infrastructure/
+├── environments/ # Environment-specific configurations
+├── platform/ # Platform-level infrastructure
+├── services/ # Application services and microservices
+├── monitoring/ # Observability and monitoring
+├── cicd/ # CI/CD pipeline components
+├── security/ # Security configurations and certificates
+├── scripts/ # Automation and utility scripts
+├── docs/ # Infrastructure documentation
+└── README.md # Top-level infrastructure guide
+```
+
+### Detailed Structure Proposal
+
+```
+infrastructure/
+├── environments/ # Environment-specific configurations
+│ ├── dev/
+│ │ ├── k8s-manifests/
+│ │ │ ├── base/
+│ │ │ │ ├── namespace.yaml
+│ │ │ │ ├── configmap.yaml
+│ │ │ │ ├── secrets.yaml
+│ │ │ │ └── ingress-https.yaml
+│ │ │ ├── components/
+│ │ │ │ ├── databases/
+│ │ │ │ ├── infrastructure/
+│ │ │ │ ├── microservices/
+│ │ │ │ └── cert-manager/
+│ │ │ ├── configs/
+│ │ │ ├── cronjobs/
+│ │ │ ├── jobs/
+│ │ │ └── migrations/
+│ │ ├── kustomization.yaml
+│ │ └── values/
+│ ├── staging/ # New staging environment
+│ │ ├── k8s-manifests/
+│ │ └── values/
+│ └── prod/
+│ ├── k8s-manifests/
+│ ├── terraform/ # Production-specific IaC
+│ └── values/
+├── platform/ # Platform-level infrastructure
+│ ├── cluster/
+│ │ ├── eks/ # AWS EKS configuration
+│ │ │ ├── terraform/
+│ │ │ └── manifests/
+│ │ └── kind/ # Local development cluster
+│ │ ├── config.yaml
+│ │ └── manifests/
+│ ├── networking/
+│ │ ├── dns/
+│ │ ├── load-balancers/
+│ │ └── ingress/
+│ │ ├── nginx/
+│ │ └── cert-manager/
+│ ├── security/
+│ │ ├── rbac/
+│ │ ├── network-policies/
+│ │ └── tls/
+│ │ ├── ca/
+│ │ ├── postgres/
+│ │ ├── redis/
+│ │ └── minio/
+│ └── storage/
+│ ├── postgres/
+│ ├── redis/
+│ └── minio/
+├── services/ # Application services
+│ ├── databases/
+│ │ ├── postgres/
+│ │ │ ├── k8s-manifests/
+│ │ │ ├── backups/
+│ │ │ ├── monitoring/
+│ │ │ └── maintenance/
+│ │ ├── redis/
+│ │ │ ├── configs/
+│ │ │ └── monitoring/
+│ │ └── minio/
+│ │ ├── buckets/
+│ │ └── policies/
+│ ├── api-gateway/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ └── microservices/
+│ ├── auth/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── tenant/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── training/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── forecasting/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── sales/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── external/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── notification/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── inventory/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── recipes/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── suppliers/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── pos/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── orders/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── production/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── procurement/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── orchestrator/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── alert-processor/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── ai-insights/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ ├── demo-session/
+│ │ ├── k8s-manifests/
+│ │ └── configs/
+│ └── frontend/
+│ ├── k8s-manifests/
+│ └── configs/
+├── monitoring/ # Observability stack
+│ ├── signoz/
+│ │ ├── manifests/
+│ │ ├── dashboards/
+│ │ │ ├── alert-management.json
+│ │ │ ├── api-performance.json
+│ │ │ ├── application-performance.json
+│ │ │ ├── database-performance.json
+│ │ │ ├── error-tracking.json
+│ │ │ ├── index.json
+│ │ │ ├── infrastructure-monitoring.json
+│ │ │ ├── log-analysis.json
+│ │ │ ├── system-health.json
+│ │ │ └── user-activity.json
+│ │ ├── values-dev.yaml
+│ │ ├── values-prod.yaml
+│ │ ├── deploy-signoz.sh
+│ │ ├── verify-signoz.sh
+│ │ └── generate-test-traffic.sh
+│ └── opentelemetry/
+│ ├── collector/
+│ └── agent/
+├── cicd/ # CI/CD pipeline
+│ ├── gitea/
+│ │ ├── values.yaml
+│ │ └── ingress.yaml
+│ ├── tekton/
+│ │ ├── tasks/
+│ │ │ ├── git-clone.yaml
+│ │ │ ├── detect-changes.yaml
+│ │ │ ├── kaniko-build.yaml
+│ │ │ └── update-gitops.yaml
+│ │ ├── pipelines/
+│ │ └── triggers/
+│ └── flux/
+│ ├── git-repository.yaml
+│ └── kustomization.yaml
+├── security/ # Security configurations
+│ ├── policies/
+│ │ ├── network-policies.yaml
+│ │ ├── pod-security.yaml
+│ │ └── rbac.yaml
+│ ├── certificates/
+│ │ ├── ca/
+│ │ ├── services/
+│ │ └── rotation-scripts/
+│ ├── scanning/
+│ │ ├── trivy/
+│ │ └── policies/
+│ └── compliance/
+│ ├── cis-benchmarks/
+│ └── audit-scripts/
+├── scripts/ # Automation scripts
+│ ├── setup/
+│ │ ├── generate-certificates.sh
+│ │ ├── generate-minio-certificates.sh
+│ │ └── setup-dockerhub-secrets.sh
+│ ├── deployment/
+│ │ ├── deploy-signoz.sh
+│ │ └── verify-signoz.sh
+│ ├── maintenance/
+│ │ ├── regenerate_migrations_k8s.sh
+│ │ └── kubernetes_restart.sh
+│ └── verification/
+│ └── verify-registry.sh
+├── docs/ # Infrastructure documentation
+│ ├── architecture/
+│ │ ├── diagrams/
+│ │ └── decisions/
+│ ├── operations/
+│ │ ├── runbooks/
+│ │ └── troubleshooting/
+│ ├── onboarding/
+│ └── reference/
+│ ├── api/
+│ └── configurations/
+└── README.md
+```
+
+## Migration Strategy
+
+### Phase 1: Preparation and Planning
+
+1. **Inventory Analysis**: Complete detailed inventory of all current files
+2. **Dependency Mapping**: Identify dependencies between components
+3. **Impact Assessment**: Determine which components can be moved safely
+4. **Backup Strategy**: Ensure all files are backed up before migration
+
+### Phase 2: Non-Critical Components
+
+1. **Documentation**: Move and update all documentation files
+2. **Scripts**: Organize automation scripts into new structure
+3. **Monitoring**: Migrate SigNoz dashboards and configurations
+4. **CI/CD**: Reorganize pipeline components
+
+### Phase 3: Environment-Specific Components
+
+1. **Create Environment Structure**: Set up dev/staging/prod directories
+2. **Migrate Kubernetes Manifests**: Move base components to appropriate locations
+3. **Update References**: Ensure all cross-references are corrected
+4. **Environment Validation**: Test each environment separately
+
+### Phase 4: Service Components
+
+1. **Database Migration**: Move database configurations to services/databases
+2. **Microservice Organization**: Group microservices by domain
+3. **Infrastructure Components**: Move gateway and other infrastructure
+4. **Service Validation**: Test each service in isolation
+
+### Phase 5: Finalization
+
+1. **Integration Testing**: Test complete infrastructure workflow
+2. **Documentation Update**: Finalize all documentation
+3. **Team Training**: Conduct training on new structure
+4. **Cleanup**: Remove old structure and temporary files
+
+## Benefits of Proposed Structure
+
+### 1. Improved Navigation
+- **Clear Hierarchy**: Logical grouping by function and environment
+- **Consistent Patterns**: Standardized structure across all components
+- **Reduced Cognitive Load**: Easier to find specific components
+
+### 2. Enhanced Maintainability
+- **Environment Isolation**: Clear separation of dev/staging/prod
+- **Component Grouping**: Related components grouped together
+- **Standardized Structure**: Consistent patterns across services
+
+### 3. Better Scalability
+- **Modular Design**: Easy to add new services or environments
+- **Domain Separation**: Services organized by business domain
+- **Infrastructure Independence**: Platform components separate from services
+
+### 4. Improved Security
+- **Centralized Security**: All security configurations in one place
+- **Environment-Specific Policies**: Tailored security for each environment
+- **Better Secret Management**: Clear structure for sensitive data
+
+### 5. Enhanced Observability
+- **Comprehensive Monitoring**: All observability tools grouped
+- **Standardized Dashboards**: Consistent monitoring across services
+- **Centralized Logging**: Better log management structure
+
+## Implementation Considerations
+
+### Tools and Technologies
+- **Terraform**: For infrastructure as code (IaC)
+- **Kustomize**: For Kubernetes manifest management
+- **Helm**: For complex application deployments
+- **SOPS/Sealed Secrets**: For secret management
+- **Trivy**: For vulnerability scanning
+
+### Team Adaptation
+- **Training Plan**: Develop comprehensive training materials
+- **Migration Guide**: Create step-by-step migration documentation
+- **Support Period**: Provide dedicated support during transition
+- **Feedback Mechanism**: Establish channels for team feedback
+
+### Risk Mitigation
+- **Phased Approach**: Implement changes incrementally
+- **Rollback Plan**: Develop comprehensive rollback procedures
+- **Testing Strategy**: Implement thorough testing at each phase
+- **Monitoring**: Enhanced monitoring during migration period
+
+## Expected Outcomes
+
+1. **Reduced Time-to-Find**: 40-60% reduction in time spent locating files
+2. **Improved Deployment Speed**: 25-35% faster deployment cycles
+3. **Enhanced Collaboration**: Better team coordination and understanding
+4. **Reduced Errors**: 30-50% reduction in configuration errors
+5. **Better Scalability**: Easier to add new services and features
+
+## Conclusion
+
+The proposed infrastructure reorganization represents a significant improvement over the current structure. By implementing a clear, logical hierarchy with proper separation of concerns, the new organization will:
+
+- **Improve operational efficiency** through better navigation and maintainability
+- **Enhance security** with centralized security management
+- **Support growth** with a scalable, modular design
+- **Reduce errors** through standardized patterns and structures
+- **Facilitate collaboration** with intuitive organization
+
+The key to successful implementation is a phased approach with thorough testing and team involvement at each stage. With proper planning and execution, this reorganization will provide long-term benefits for the Bakery-IA project's infrastructure management.
+
+## Appendix: File Migration Mapping
+
+### Current → Proposed Mapping
+
+**Kubernetes Components:**
+- `infrastructure/kubernetes/base/components/*` → `infrastructure/services/microservices/*/`
+- `infrastructure/kubernetes/base/components/databases/*` → `infrastructure/services/databases/*/`
+- `infrastructure/kubernetes/base/migrations/*` → `infrastructure/services/microservices/*/migrations/`
+- `infrastructure/kubernetes/base/configs/*` → `infrastructure/environments/*/values/`
+
+**CI/CD Components:**
+- `infrastructure/ci-cd/*` → `infrastructure/cicd/*/`
+
+**Monitoring Components:**
+- `infrastructure/signoz/*` → `infrastructure/monitoring/signoz/*/`
+- `infrastructure/helm/*` → `infrastructure/monitoring/signoz/*/` (signoz-related)
+
+**Security Components:**
+- `infrastructure/tls/*` → `infrastructure/security/certificates/*/`
+
+**Scripts:**
+- `infrastructure/kubernetes/*.sh` → `infrastructure/scripts/*/`
+- `infrastructure/helm/*.sh` → `infrastructure/scripts/deployment/*/`
+- `infrastructure/tls/*.sh` → `infrastructure/scripts/setup/*/`
+
+This mapping provides a clear path for migrating each component to its new location while maintaining functionality and relationships between components.
\ No newline at end of file
diff --git a/infrastructure/ci-cd/README.md b/infrastructure/ci-cd/README.md
new file mode 100644
index 00000000..ebdb18dd
--- /dev/null
+++ b/infrastructure/ci-cd/README.md
@@ -0,0 +1,294 @@
+# Bakery-IA CI/CD Implementation
+
+This directory contains the configuration for the production-grade CI/CD system for Bakery-IA using Gitea, Tekton, and Flux CD.
+
+## Architecture Overview
+
+```mermaid
+graph TD
+ A[Developer] -->|Push Code| B[Gitea]
+ B -->|Webhook| C[Tekton Pipelines]
+ C -->|Build/Test| D[Gitea Registry]
+ D -->|New Image| E[Flux CD]
+ E -->|kubectl apply| F[MicroK8s Cluster]
+ F -->|Metrics| G[SigNoz]
+```
+
+## Directory Structure
+
+```
+infrastructure/ci-cd/
+├── gitea/ # Gitea configuration (Git server + registry)
+│ ├── values.yaml # Helm values for Gitea
+│ └── ingress.yaml # Ingress configuration
+├── tekton/ # Tekton CI/CD pipeline configuration
+│ ├── tasks/ # Individual pipeline tasks
+│ │ ├── git-clone.yaml
+│ │ ├── detect-changes.yaml
+│ │ ├── kaniko-build.yaml
+│ │ └── update-gitops.yaml
+│ ├── pipelines/ # Pipeline definitions
+│ │ └── ci-pipeline.yaml
+│ └── triggers/ # Webhook trigger configuration
+│ ├── trigger-template.yaml
+│ ├── trigger-binding.yaml
+│ ├── event-listener.yaml
+│ └── gitlab-interceptor.yaml
+├── flux/ # Flux CD GitOps configuration
+│ ├── git-repository.yaml # Git repository source
+│ └── kustomization.yaml # Deployment kustomization
+├── monitoring/ # Monitoring configuration
+│ └── otel-collector.yaml # OpenTelemetry collector
+└── README.md # This file
+```
+
+## Deployment Instructions
+
+### Phase 1: Infrastructure Setup
+
+1. **Deploy Gitea**:
+ ```bash
+ # Add Helm repo
+ microk8s helm repo add gitea https://dl.gitea.io/charts
+
+ # Create namespace
+ microk8s kubectl create namespace gitea
+
+ # Install Gitea
+ microk8s helm install gitea gitea/gitea \
+ -n gitea \
+ -f infrastructure/ci-cd/gitea/values.yaml
+
+ # Apply ingress
+ microk8s kubectl apply -f infrastructure/ci-cd/gitea/ingress.yaml
+ ```
+
+2. **Deploy Tekton**:
+ ```bash
+ # Create namespace
+ microk8s kubectl create namespace tekton-pipelines
+
+ # Install Tekton Pipelines
+ microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml
+
+ # Install Tekton Triggers
+ microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml
+
+ # Apply Tekton configurations
+ microk8s kubectl apply -f infrastructure/ci-cd/tekton/tasks/
+ microk8s kubectl apply -f infrastructure/ci-cd/tekton/pipelines/
+ microk8s kubectl apply -f infrastructure/ci-cd/tekton/triggers/
+ ```
+
+3. **Deploy Flux CD** (already enabled in MicroK8s):
+ ```bash
+ # Verify Flux installation
+ microk8s kubectl get pods -n flux-system
+
+ # Apply Flux configurations
+ microk8s kubectl apply -f infrastructure/ci-cd/flux/
+ ```
+
+### Phase 2: Configuration
+
+1. **Set up Gitea webhook**:
+ - Go to your Gitea repository settings
+ - Add webhook with URL: `http://tekton-triggers.tekton-pipelines.svc.cluster.local:8080`
+ - Use the secret from `gitea-webhook-secret`
+
+2. **Configure registry credentials**:
+ ```bash
+ # Create registry credentials secret
+ microk8s kubectl create secret docker-registry gitea-registry-credentials \
+ -n tekton-pipelines \
+ --docker-server=gitea.bakery-ia.local:5000 \
+ --docker-username=your-username \
+ --docker-password=your-password
+ ```
+
+3. **Configure Git credentials for Flux**:
+ ```bash
+ # Create Git credentials secret
+ microk8s kubectl create secret generic gitea-credentials \
+ -n flux-system \
+ --from-literal=username=your-username \
+ --from-literal=password=your-password
+ ```
+
+### Phase 3: Monitoring
+
+```bash
+# Apply OpenTelemetry configuration
+microk8s kubectl apply -f infrastructure/ci-cd/monitoring/otel-collector.yaml
+```
+
+## Usage
+
+### Triggering a Pipeline
+
+1. **Manual trigger**:
+ ```bash
+ # Create a PipelineRun manually
+ microk8s kubectl create -f - < -c
+
+# View Tekton dashboard
+microk8s kubectl port-forward -n tekton-pipelines svc/tekton-dashboard 9097:9097
+```
+
+## Troubleshooting
+
+### Common Issues
+
+1. **Pipeline not triggering**:
+ - Check Gitea webhook logs
+ - Verify EventListener pods are running
+ - Check TriggerBinding configuration
+
+2. **Build failures**:
+ - Check Kaniko logs for build errors
+ - Verify Dockerfile paths are correct
+ - Ensure registry credentials are valid
+
+3. **Flux not applying changes**:
+ - Check GitRepository status
+ - Verify Kustomization reconciliation
+ - Check Flux logs for errors
+
+### Debugging Commands
+
+```bash
+# Check Tekton controller logs
+microk8s kubectl logs -n tekton-pipelines -l app=tekton-pipelines-controller
+
+# Check Flux reconciliation
+microk8s kubectl get kustomizations -n flux-system -o yaml
+
+# Check Gitea webhook delivery
+microk8s kubectl logs -n tekton-pipelines -l app=tekton-triggers-controller
+```
+
+## Security Considerations
+
+1. **Secrets Management**:
+ - Use Kubernetes secrets for sensitive data
+ - Rotate credentials regularly
+ - Use RBAC for namespace isolation
+
+2. **Network Security**:
+ - Configure network policies
+ - Use internal DNS names
+ - Restrict ingress access
+
+3. **Registry Security**:
+ - Enable image scanning
+ - Use image signing
+ - Implement cleanup policies
+
+## Maintenance
+
+### Upgrading Components
+
+```bash
+# Upgrade Tekton
+microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml
+
+# Upgrade Flux
+microk8s helm upgrade fluxcd fluxcd/flux2 -n flux-system
+
+# Upgrade Gitea
+microk8s helm upgrade gitea gitea/gitea -n gitea -f infrastructure/ci-cd/gitea/values.yaml
+```
+
+### Backup Procedures
+
+```bash
+# Backup Gitea
+microk8s kubectl exec -n gitea gitea-0 -- gitea dump -c /data/gitea/conf/app.ini
+
+# Backup Flux configurations
+microk8s kubectl get all -n flux-system -o yaml > flux-backup.yaml
+
+# Backup Tekton configurations
+microk8s kubectl get all -n tekton-pipelines -o yaml > tekton-backup.yaml
+```
+
+## Performance Optimization
+
+1. **Resource Management**:
+ - Set appropriate resource limits
+ - Limit concurrent builds
+ - Use node selectors for build pods
+
+2. **Caching**:
+ - Configure Kaniko cache
+ - Use persistent volumes for dependencies
+ - Cache Docker layers
+
+3. **Parallelization**:
+ - Build independent services in parallel
+ - Use matrix builds for different architectures
+ - Optimize task dependencies
+
+## Integration with Existing System
+
+The CI/CD system integrates with:
+- **SigNoz**: For monitoring and observability
+- **MicroK8s**: For cluster management
+- **Existing Kubernetes manifests**: In `infrastructure/kubernetes/`
+- **Current services**: All 19 microservices in `services/`
+
+## Migration Plan
+
+1. **Phase 1**: Set up infrastructure (Gitea, Tekton, Flux)
+2. **Phase 2**: Configure pipelines and triggers
+3. **Phase 3**: Test with non-critical services
+4. **Phase 4**: Gradual rollout to all services
+5. **Phase 5**: Decommission old deployment methods
+
+## Support
+
+For issues with the CI/CD system:
+- Check logs and monitoring first
+- Review the troubleshooting section
+- Consult the original implementation plan
+- Refer to component documentation:
+ - [Tekton Documentation](https://tekton.dev/docs/)
+ - [Flux CD Documentation](https://fluxcd.io/docs/)
+ - [Gitea Documentation](https://docs.gitea.io/)
\ No newline at end of file
diff --git a/infrastructure/ci-cd/flux/git-repository.yaml b/infrastructure/ci-cd/flux/git-repository.yaml
new file mode 100644
index 00000000..68eb46af
--- /dev/null
+++ b/infrastructure/ci-cd/flux/git-repository.yaml
@@ -0,0 +1,16 @@
+# Flux GitRepository for Bakery-IA
+# This resource tells Flux where to find the Git repository
+
+apiVersion: source.toolkit.fluxcd.io/v1
+kind: GitRepository
+metadata:
+ name: bakery-ia
+ namespace: flux-system
+spec:
+ interval: 1m
+ url: http://gitea.bakery-ia.local/bakery/bakery-ia.git
+ ref:
+ branch: main
+ secretRef:
+ name: gitea-credentials
+ timeout: 60s
\ No newline at end of file
diff --git a/infrastructure/ci-cd/flux/kustomization.yaml b/infrastructure/ci-cd/flux/kustomization.yaml
new file mode 100644
index 00000000..37e9df54
--- /dev/null
+++ b/infrastructure/ci-cd/flux/kustomization.yaml
@@ -0,0 +1,27 @@
+# Flux Kustomization for Bakery-IA Production Deployment
+# This resource tells Flux how to deploy the application
+
+apiVersion: kustomize.toolkit.fluxcd.io/v1
+kind: Kustomization
+metadata:
+ name: bakery-ia-prod
+ namespace: flux-system
+spec:
+ interval: 5m
+ path: ./infrastructure/kubernetes/overlays/prod
+ prune: true
+ sourceRef:
+ kind: GitRepository
+ name: bakery-ia
+ targetNamespace: bakery-ia
+ timeout: 5m
+ retryInterval: 1m
+ healthChecks:
+ - apiVersion: apps/v1
+ kind: Deployment
+ name: auth-service
+ namespace: bakery-ia
+ - apiVersion: apps/v1
+ kind: Deployment
+ name: gateway
+ namespace: bakery-ia
\ No newline at end of file
diff --git a/infrastructure/ci-cd/gitea/ingress.yaml b/infrastructure/ci-cd/gitea/ingress.yaml
new file mode 100644
index 00000000..ecfbc9d5
--- /dev/null
+++ b/infrastructure/ci-cd/gitea/ingress.yaml
@@ -0,0 +1,25 @@
+# Gitea Ingress configuration for Bakery-IA CI/CD
+# This provides external access to Gitea within the cluster
+
+apiVersion: networking.k8s.io/v1
+kind: Ingress
+metadata:
+ name: gitea-ingress
+ namespace: gitea
+ annotations:
+ nginx.ingress.kubernetes.io/rewrite-target: /
+ nginx.ingress.kubernetes.io/proxy-body-size: "0"
+ nginx.ingress.kubernetes.io/proxy-read-timeout: "600"
+ nginx.ingress.kubernetes.io/proxy-send-timeout: "600"
+spec:
+ rules:
+ - host: gitea.bakery-ia.local
+ http:
+ paths:
+ - path: /
+ pathType: Prefix
+ backend:
+ service:
+ name: gitea-http
+ port:
+ number: 3000
\ No newline at end of file
diff --git a/infrastructure/ci-cd/gitea/values.yaml b/infrastructure/ci-cd/gitea/values.yaml
new file mode 100644
index 00000000..f94929e8
--- /dev/null
+++ b/infrastructure/ci-cd/gitea/values.yaml
@@ -0,0 +1,38 @@
+# Gitea Helm values configuration for Bakery-IA CI/CD
+# This configuration sets up Gitea with registry support and appropriate storage
+
+service:
+ type: ClusterIP
+ httpPort: 3000
+ sshPort: 2222
+
+persistence:
+ enabled: true
+ size: 50Gi
+ storageClass: "microk8s-hostpath"
+
+gitea:
+ config:
+ server:
+ DOMAIN: gitea.bakery-ia.local
+ SSH_DOMAIN: gitea.bakery-ia.local
+ ROOT_URL: http://gitea.bakery-ia.local
+ repository:
+ ENABLE_PUSH_CREATE_USER: true
+ ENABLE_PUSH_CREATE_ORG: true
+ registry:
+ ENABLED: true
+
+postgresql:
+ enabled: true
+ persistence:
+ size: 20Gi
+
+# Resource configuration for production environment
+resources:
+ limits:
+ cpu: 1000m
+ memory: 1Gi
+ requests:
+ cpu: 500m
+ memory: 512Mi
\ No newline at end of file
diff --git a/infrastructure/ci-cd/monitoring/otel-collector.yaml b/infrastructure/ci-cd/monitoring/otel-collector.yaml
new file mode 100644
index 00000000..a8634707
--- /dev/null
+++ b/infrastructure/ci-cd/monitoring/otel-collector.yaml
@@ -0,0 +1,70 @@
+# OpenTelemetry Collector for Bakery-IA CI/CD Monitoring
+# This collects metrics and traces from Tekton pipelines
+
+apiVersion: opentelemetry.io/v1alpha1
+kind: OpenTelemetryCollector
+metadata:
+ name: tekton-otel
+ namespace: tekton-pipelines
+spec:
+ config: |
+ receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+ http:
+ endpoint: 0.0.0.0:4318
+ prometheus:
+ config:
+ scrape_configs:
+ - job_name: 'tekton-pipelines'
+ scrape_interval: 30s
+ static_configs:
+ - targets: ['tekton-pipelines-controller.tekton-pipelines.svc.cluster.local:9090']
+
+ processors:
+ batch:
+ timeout: 5s
+ send_batch_size: 1000
+ memory_limiter:
+ check_interval: 2s
+ limit_percentage: 75
+ spike_limit_percentage: 20
+
+ exporters:
+ otlp:
+ endpoint: "signoz-otel-collector.monitoring.svc.cluster.local:4317"
+ tls:
+ insecure: true
+ retry_on_failure:
+ enabled: true
+ initial_interval: 5s
+ max_interval: 30s
+ max_elapsed_time: 300s
+ logging:
+ logLevel: debug
+
+ service:
+ pipelines:
+ traces:
+ receivers: [otlp]
+ processors: [memory_limiter, batch]
+ exporters: [otlp, logging]
+ metrics:
+ receivers: [otlp, prometheus]
+ processors: [memory_limiter, batch]
+ exporters: [otlp, logging]
+ telemetry:
+ logs:
+ level: "info"
+ encoding: "json"
+
+ mode: deployment
+ resources:
+ limits:
+ cpu: 500m
+ memory: 512Mi
+ requests:
+ cpu: 200m
+ memory: 256Mi
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml b/infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml
new file mode 100644
index 00000000..c20068b2
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml
@@ -0,0 +1,83 @@
+# Main CI Pipeline for Bakery-IA
+# This pipeline orchestrates the build, test, and deploy process
+
+apiVersion: tekton.dev/v1beta1
+kind: Pipeline
+metadata:
+ name: bakery-ia-ci
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: shared-workspace
+ - name: docker-credentials
+ params:
+ - name: git-url
+ type: string
+ description: Repository URL
+ - name: git-revision
+ type: string
+ description: Git revision/commit hash
+ - name: registry
+ type: string
+ description: Container registry URL
+ default: "gitea.bakery-ia.local:5000"
+ tasks:
+ - name: fetch-source
+ taskRef:
+ name: git-clone
+ workspaces:
+ - name: output
+ workspace: shared-workspace
+ params:
+ - name: url
+ value: $(params.git-url)
+ - name: revision
+ value: $(params.git-revision)
+
+ - name: detect-changes
+ runAfter: [fetch-source]
+ taskRef:
+ name: detect-changed-services
+ workspaces:
+ - name: source
+ workspace: shared-workspace
+
+ - name: build-and-push
+ runAfter: [detect-changes]
+ taskRef:
+ name: kaniko-build
+ when:
+ - input: "$(tasks.detect-changes.results.changed-services)"
+ operator: notin
+ values: ["none"]
+ workspaces:
+ - name: source
+ workspace: shared-workspace
+ - name: docker-credentials
+ workspace: docker-credentials
+ params:
+ - name: services
+ value: $(tasks.detect-changes.results.changed-services)
+ - name: registry
+ value: $(params.registry)
+ - name: git-revision
+ value: $(params.git-revision)
+
+ - name: update-gitops-manifests
+ runAfter: [build-and-push]
+ taskRef:
+ name: update-gitops
+ when:
+ - input: "$(tasks.detect-changes.results.changed-services)"
+ operator: notin
+ values: ["none"]
+ workspaces:
+ - name: source
+ workspace: shared-workspace
+ params:
+ - name: services
+ value: $(tasks.detect-changes.results.changed-services)
+ - name: registry
+ value: $(params.registry)
+ - name: git-revision
+ value: $(params.git-revision)
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/tasks/detect-changes.yaml b/infrastructure/ci-cd/tekton/tasks/detect-changes.yaml
new file mode 100644
index 00000000..32abd32c
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/tasks/detect-changes.yaml
@@ -0,0 +1,64 @@
+# Tekton Detect Changed Services Task for Bakery-IA CI/CD
+# This task identifies which services have changed in the repository
+
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: detect-changed-services
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: source
+ results:
+ - name: changed-services
+ description: Comma-separated list of changed services
+ steps:
+ - name: detect
+ image: alpine/git
+ script: |
+ #!/bin/sh
+ set -e
+ cd $(workspaces.source.path)
+
+ echo "Detecting changed files..."
+ # Get list of changed files compared to previous commit
+ CHANGED_FILES=$(git diff --name-only HEAD~1 HEAD 2>/dev/null || git diff --name-only HEAD)
+
+ echo "Changed files: $CHANGED_FILES"
+
+ # Map files to services
+ CHANGED_SERVICES=()
+ for file in $CHANGED_FILES; do
+ if [[ $file == services/* ]]; then
+ SERVICE=$(echo $file | cut -d'/' -f2)
+ # Only add unique service names
+ if [[ ! " ${CHANGED_SERVICES[@]} " =~ " ${SERVICE} " ]]; then
+ CHANGED_SERVICES+=("$SERVICE")
+ fi
+ elif [[ $file == frontend/* ]]; then
+ CHANGED_SERVICES+=("frontend")
+ break
+ elif [[ $file == gateway/* ]]; then
+ CHANGED_SERVICES+=("gateway")
+ break
+ fi
+ done
+
+ # If no specific services changed, check for infrastructure changes
+ if [ ${#CHANGED_SERVICES[@]} -eq 0 ]; then
+ for file in $CHANGED_FILES; do
+ if [[ $file == infrastructure/* ]]; then
+ CHANGED_SERVICES+=("infrastructure")
+ break
+ fi
+ done
+ fi
+
+ # Output result
+ if [ ${#CHANGED_SERVICES[@]} -eq 0 ]; then
+ echo "No service changes detected"
+ echo "none" | tee $(results.changed-services.path)
+ else
+ echo "Detected changes in services: ${CHANGED_SERVICES[@]}"
+ echo $(printf "%s," "${CHANGED_SERVICES[@]}" | sed 's/,$//') | tee $(results.changed-services.path)
+ fi
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/tasks/git-clone.yaml b/infrastructure/ci-cd/tekton/tasks/git-clone.yaml
new file mode 100644
index 00000000..5decee5c
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/tasks/git-clone.yaml
@@ -0,0 +1,31 @@
+# Tekton Git Clone Task for Bakery-IA CI/CD
+# This task clones the source code repository
+
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: git-clone
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: output
+ params:
+ - name: url
+ type: string
+ description: Repository URL to clone
+ - name: revision
+ type: string
+ description: Git revision to checkout
+ default: "main"
+ steps:
+ - name: clone
+ image: alpine/git
+ script: |
+ #!/bin/sh
+ set -e
+ echo "Cloning repository: $(params.url)"
+ git clone $(params.url) $(workspaces.output.path)
+ cd $(workspaces.output.path)
+ echo "Checking out revision: $(params.revision)"
+ git checkout $(params.revision)
+ echo "Repository cloned successfully"
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml b/infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml
new file mode 100644
index 00000000..1f9290e8
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml
@@ -0,0 +1,40 @@
+# Tekton Kaniko Build Task for Bakery-IA CI/CD
+# This task builds and pushes container images using Kaniko
+
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: kaniko-build
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: source
+ - name: docker-credentials
+ params:
+ - name: services
+ type: string
+ description: Comma-separated list of services to build
+ - name: registry
+ type: string
+ description: Container registry URL
+ default: "gitea.bakery-ia.local:5000"
+ - name: git-revision
+ type: string
+ description: Git revision for image tag
+ default: "latest"
+ steps:
+ - name: build-and-push
+ image: gcr.io/kaniko-project/executor:v1.9.0
+ args:
+ - --dockerfile=$(workspaces.source.path)/services/$(params.services)/Dockerfile
+ - --context=$(workspaces.source.path)
+ - --destination=$(params.registry)/bakery/$(params.services):$(params.git-revision)
+ - --verbosity=info
+ volumeMounts:
+ - name: docker-config
+ mountPath: /kaniko/.docker
+ securityContext:
+ runAsUser: 0
+ volumes:
+ - name: docker-config
+ emptyDir: {}
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/tasks/update-gitops.yaml b/infrastructure/ci-cd/tekton/tasks/update-gitops.yaml
new file mode 100644
index 00000000..1e89d170
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/tasks/update-gitops.yaml
@@ -0,0 +1,66 @@
+# Tekton Update GitOps Manifests Task for Bakery-IA CI/CD
+# This task updates Kubernetes manifests with new image tags
+
+apiVersion: tekton.dev/v1beta1
+kind: Task
+metadata:
+ name: update-gitops
+ namespace: tekton-pipelines
+spec:
+ workspaces:
+ - name: source
+ params:
+ - name: services
+ type: string
+ description: Comma-separated list of services to update
+ - name: registry
+ type: string
+ description: Container registry URL
+ - name: git-revision
+ type: string
+ description: Git revision for image tag
+ steps:
+ - name: update-manifests
+ image: bitnami/kubectl
+ script: |
+ #!/bin/sh
+ set -e
+ cd $(workspaces.source.path)
+
+ echo "Updating GitOps manifests for services: $(params.services)"
+
+ # Split services by comma
+ IFS=',' read -ra SERVICES <<< "$(params.services)"
+
+ for service in "${SERVICES[@]}"; do
+ echo "Processing service: $service"
+
+ # Find and update Kubernetes manifests
+ if [ "$service" = "frontend" ]; then
+ # Update frontend deployment
+ if [ -f "infrastructure/kubernetes/overlays/prod/frontend-deployment.yaml" ]; then
+ sed -i "s|image:.*|image: $(params.registry)/bakery/frontend:$(params.git-revision)|g" \
+ "infrastructure/kubernetes/overlays/prod/frontend-deployment.yaml"
+ fi
+ elif [ "$service" = "gateway" ]; then
+ # Update gateway deployment
+ if [ -f "infrastructure/kubernetes/overlays/prod/gateway-deployment.yaml" ]; then
+ sed -i "s|image:.*|image: $(params.registry)/bakery/gateway:$(params.git-revision)|g" \
+ "infrastructure/kubernetes/overlays/prod/gateway-deployment.yaml"
+ fi
+ else
+ # Update service deployment
+ DEPLOYMENT_FILE="infrastructure/kubernetes/overlays/prod/${service}-deployment.yaml"
+ if [ -f "$DEPLOYMENT_FILE" ]; then
+ sed -i "s|image:.*|image: $(params.registry)/bakery/${service}:$(params.git-revision)|g" \
+ "$DEPLOYMENT_FILE"
+ fi
+ fi
+ done
+
+ # Commit changes
+ git config --global user.name "bakery-ia-ci"
+ git config --global user.email "ci@bakery-ia.local"
+ git add .
+ git commit -m "CI: Update image tags for $(params.services) to $(params.git-revision)"
+ git push origin HEAD
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/triggers/event-listener.yaml b/infrastructure/ci-cd/tekton/triggers/event-listener.yaml
new file mode 100644
index 00000000..5049bacb
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/triggers/event-listener.yaml
@@ -0,0 +1,26 @@
+# Tekton EventListener for Bakery-IA CI/CD
+# This listener receives webhook events and triggers pipelines
+
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: EventListener
+metadata:
+ name: bakery-ia-listener
+ namespace: tekton-pipelines
+spec:
+ serviceAccountName: tekton-triggers-sa
+ triggers:
+ - name: bakery-ia-gitea-trigger
+ bindings:
+ - ref: bakery-ia-trigger-binding
+ template:
+ ref: bakery-ia-trigger-template
+ interceptors:
+ - ref:
+ name: "gitlab"
+ params:
+ - name: "secretRef"
+ value:
+ secretName: gitea-webhook-secret
+ secretKey: secretToken
+ - name: "eventTypes"
+ value: ["push"]
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml b/infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml
new file mode 100644
index 00000000..c8fc1c26
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml
@@ -0,0 +1,14 @@
+# GitLab/Gitea Webhook Interceptor for Tekton Triggers
+# This interceptor validates and processes Gitea webhook events
+
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: ClusterInterceptor
+metadata:
+ name: gitlab
+spec:
+ clientConfig:
+ service:
+ name: tekton-triggers-core-interceptors
+ namespace: tekton-pipelines
+ path: "/v1/webhook/gitlab"
+ port: 8443
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml b/infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml
new file mode 100644
index 00000000..8116792a
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml
@@ -0,0 +1,16 @@
+# Tekton TriggerBinding for Bakery-IA CI/CD
+# This binding extracts parameters from Gitea webhook events
+
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: TriggerBinding
+metadata:
+ name: bakery-ia-trigger-binding
+ namespace: tekton-pipelines
+spec:
+ params:
+ - name: git-repo-url
+ value: $(body.repository.clone_url)
+ - name: git-revision
+ value: $(body.head_commit.id)
+ - name: git-repo-name
+ value: $(body.repository.name)
\ No newline at end of file
diff --git a/infrastructure/ci-cd/tekton/triggers/trigger-template.yaml b/infrastructure/ci-cd/tekton/triggers/trigger-template.yaml
new file mode 100644
index 00000000..17208bf8
--- /dev/null
+++ b/infrastructure/ci-cd/tekton/triggers/trigger-template.yaml
@@ -0,0 +1,43 @@
+# Tekton TriggerTemplate for Bakery-IA CI/CD
+# This template defines how PipelineRuns are created when triggers fire
+
+apiVersion: triggers.tekton.dev/v1alpha1
+kind: TriggerTemplate
+metadata:
+ name: bakery-ia-trigger-template
+ namespace: tekton-pipelines
+spec:
+ params:
+ - name: git-repo-url
+ description: The git repository URL
+ - name: git-revision
+ description: The git revision/commit hash
+ - name: git-repo-name
+ description: The git repository name
+ default: "bakery-ia"
+ resourcetemplates:
+ - apiVersion: tekton.dev/v1beta1
+ kind: PipelineRun
+ metadata:
+ generateName: bakery-ia-ci-run-$(params.git-repo-name)-
+ spec:
+ pipelineRef:
+ name: bakery-ia-ci
+ workspaces:
+ - name: shared-workspace
+ volumeClaimTemplate:
+ spec:
+ accessModes: ["ReadWriteOnce"]
+ resources:
+ requests:
+ storage: 5Gi
+ - name: docker-credentials
+ secret:
+ secretName: gitea-registry-credentials
+ params:
+ - name: git-url
+ value: $(params.git-repo-url)
+ - name: git-revision
+ value: $(params.git-revision)
+ - name: registry
+ value: "gitea.bakery-ia.local:5000"
\ No newline at end of file
diff --git a/services/orchestrator/app/main.py b/services/orchestrator/app/main.py
index 33271ecb..94b5351c 100644
--- a/services/orchestrator/app/main.py
+++ b/services/orchestrator/app/main.py
@@ -18,6 +18,28 @@ class OrchestratorService(StandardFastAPIService):
expected_migration_version = "001_initial_schema"
+ def __init__(self):
+ # Define expected database tables for health checks
+ orchestrator_expected_tables = [
+ 'orchestration_runs'
+ ]
+
+ self.rabbitmq_client = None
+ self.event_publisher = None
+ self.leader_election = None
+ self.scheduler_service = None
+
+ super().__init__(
+ service_name="orchestrator-service",
+ app_name=settings.APP_NAME,
+ description=settings.DESCRIPTION,
+ version=settings.VERSION,
+ api_prefix="", # Empty because RouteBuilder already includes /api/v1
+ database_manager=database_manager,
+ expected_tables=orchestrator_expected_tables,
+ enable_messaging=True # Enable RabbitMQ for event publishing
+ )
+
async def verify_migrations(self):
"""Verify database schema matches the latest migrations"""
try:
@@ -32,26 +54,6 @@ class OrchestratorService(StandardFastAPIService):
self.logger.error(f"Migration verification failed: {e}")
raise
- def __init__(self):
- # Define expected database tables for health checks
- orchestrator_expected_tables = [
- 'orchestration_runs'
- ]
-
- self.rabbitmq_client = None
- self.event_publisher = None
-
- super().__init__(
- service_name="orchestrator-service",
- app_name=settings.APP_NAME,
- description=settings.DESCRIPTION,
- version=settings.VERSION,
- api_prefix="", # Empty because RouteBuilder already includes /api/v1
- database_manager=database_manager,
- expected_tables=orchestrator_expected_tables,
- enable_messaging=True # Enable RabbitMQ for event publishing
- )
-
async def _setup_messaging(self):
"""Setup messaging for orchestrator service"""
from shared.messaging import UnifiedEventPublisher, RabbitMQClient
@@ -84,22 +86,91 @@ class OrchestratorService(StandardFastAPIService):
self.logger.info("Orchestrator Service starting up...")
- # Initialize orchestrator scheduler service with EventPublisher
- from app.services.orchestrator_service import OrchestratorSchedulerService
- scheduler_service = OrchestratorSchedulerService(self.event_publisher, settings)
- await scheduler_service.start()
- app.state.scheduler_service = scheduler_service
- self.logger.info("Orchestrator scheduler service started")
+ # Initialize leader election for horizontal scaling
+ # Only the leader pod will run the scheduler
+ await self._setup_leader_election(app)
# REMOVED: Delivery tracking service - moved to procurement service (domain ownership)
+ async def _setup_leader_election(self, app: FastAPI):
+ """
+ Setup leader election for scheduler.
+
+ CRITICAL FOR HORIZONTAL SCALING:
+ Without leader election, each pod would run the same scheduled jobs,
+ causing duplicate forecasts, production schedules, and database contention.
+ """
+ from shared.leader_election import LeaderElectionService
+ import redis.asyncio as redis
+
+ try:
+ # Create Redis connection for leader election
+ redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
+ if settings.REDIS_TLS_ENABLED.lower() == "true":
+ redis_url = redis_url.replace("redis://", "rediss://")
+
+ redis_client = redis.from_url(redis_url, decode_responses=False)
+ await redis_client.ping()
+
+ # Use shared leader election service
+ self.leader_election = LeaderElectionService(
+ redis_client,
+ service_name="orchestrator"
+ )
+
+ # Define callbacks for leader state changes
+ async def on_become_leader():
+ self.logger.info("This pod became the leader - starting scheduler")
+ from app.services.orchestrator_service import OrchestratorSchedulerService
+ self.scheduler_service = OrchestratorSchedulerService(self.event_publisher, settings)
+ await self.scheduler_service.start()
+ app.state.scheduler_service = self.scheduler_service
+ self.logger.info("Orchestrator scheduler service started (leader only)")
+
+ async def on_lose_leader():
+ self.logger.warning("This pod lost leadership - stopping scheduler")
+ if self.scheduler_service:
+ await self.scheduler_service.stop()
+ self.scheduler_service = None
+ if hasattr(app.state, 'scheduler_service'):
+ app.state.scheduler_service = None
+ self.logger.info("Orchestrator scheduler service stopped (no longer leader)")
+
+ # Start leader election
+ await self.leader_election.start(
+ on_become_leader=on_become_leader,
+ on_lose_leader=on_lose_leader
+ )
+
+ # Store leader election in app state for health checks
+ app.state.leader_election = self.leader_election
+
+ self.logger.info("Leader election initialized",
+ is_leader=self.leader_election.is_leader,
+ instance_id=self.leader_election.instance_id)
+
+ except Exception as e:
+ self.logger.error("Failed to setup leader election, falling back to standalone mode",
+ error=str(e))
+ # Fallback: start scheduler anyway (for single-pod deployments)
+ from app.services.orchestrator_service import OrchestratorSchedulerService
+ self.scheduler_service = OrchestratorSchedulerService(self.event_publisher, settings)
+ await self.scheduler_service.start()
+ app.state.scheduler_service = self.scheduler_service
+ self.logger.warning("Scheduler started in standalone mode (no leader election)")
+
async def on_shutdown(self, app: FastAPI):
"""Custom shutdown logic for orchestrator service"""
self.logger.info("Orchestrator Service shutting down...")
- # Stop scheduler service
- if hasattr(app.state, 'scheduler_service'):
- await app.state.scheduler_service.stop()
+ # Stop leader election (this will also stop scheduler if we're the leader)
+ if self.leader_election:
+ await self.leader_election.stop()
+ self.logger.info("Leader election stopped")
+
+ # Stop scheduler service if still running
+ if self.scheduler_service:
+ await self.scheduler_service.stop()
self.logger.info("Orchestrator scheduler service stopped")
diff --git a/services/procurement/app/services/delivery_tracking_service.py b/services/procurement/app/services/delivery_tracking_service.py
index 32fe8167..fa56a730 100644
--- a/services/procurement/app/services/delivery_tracking_service.py
+++ b/services/procurement/app/services/delivery_tracking_service.py
@@ -1,12 +1,12 @@
"""
-Delivery Tracking Service - Simplified
+Delivery Tracking Service - With Leader Election
Tracks purchase order deliveries and generates appropriate alerts using EventPublisher:
- DELIVERY_ARRIVING_SOON: 2 hours before delivery window
- DELIVERY_OVERDUE: 30 minutes after expected delivery time
- STOCK_RECEIPT_INCOMPLETE: If delivery not marked as received
-Runs as internal scheduler with leader election.
+Runs as internal scheduler with leader election for horizontal scaling.
Domain ownership: Procurement service owns all PO and delivery tracking.
"""
@@ -30,7 +30,7 @@ class DeliveryTrackingService:
Monitors PO deliveries and generates time-based alerts using EventPublisher.
Uses APScheduler with leader election to run hourly checks.
- Only one pod executes checks (others skip if not leader).
+ Only one pod executes checks - leader election ensures no duplicate alerts.
"""
def __init__(self, event_publisher: UnifiedEventPublisher, config, database_manager=None):
@@ -38,46 +38,121 @@ class DeliveryTrackingService:
self.config = config
self.database_manager = database_manager
self.scheduler = AsyncIOScheduler()
- self.is_leader = False
+ self._leader_election = None
+ self._redis_client = None
+ self._scheduler_started = False
self.instance_id = str(uuid4())[:8] # Short instance ID for logging
async def start(self):
- """Start the delivery tracking scheduler"""
- # Initialize and start scheduler if not already running
+ """Start the delivery tracking scheduler with leader election"""
+ try:
+ # Initialize leader election
+ await self._setup_leader_election()
+ except Exception as e:
+ logger.error("Failed to setup leader election, starting in standalone mode",
+ error=str(e))
+ # Fallback: start scheduler without leader election
+ await self._start_scheduler()
+
+ async def _setup_leader_election(self):
+ """Setup Redis-based leader election for horizontal scaling"""
+ from shared.leader_election import LeaderElectionService
+ import redis.asyncio as redis
+
+ # Build Redis URL from config
+ redis_url = getattr(self.config, 'REDIS_URL', None)
+ if not redis_url:
+ redis_password = getattr(self.config, 'REDIS_PASSWORD', '')
+ redis_host = getattr(self.config, 'REDIS_HOST', 'localhost')
+ redis_port = getattr(self.config, 'REDIS_PORT', 6379)
+ redis_db = getattr(self.config, 'REDIS_DB', 0)
+ redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
+
+ self._redis_client = redis.from_url(redis_url, decode_responses=False)
+ await self._redis_client.ping()
+
+ # Create leader election service
+ self._leader_election = LeaderElectionService(
+ self._redis_client,
+ service_name="procurement-delivery-tracking"
+ )
+
+ # Start leader election with callbacks
+ await self._leader_election.start(
+ on_become_leader=self._on_become_leader,
+ on_lose_leader=self._on_lose_leader
+ )
+
+ logger.info("Leader election initialized for delivery tracking",
+ is_leader=self._leader_election.is_leader,
+ instance_id=self.instance_id)
+
+ async def _on_become_leader(self):
+ """Called when this instance becomes the leader"""
+ logger.info("Became leader for delivery tracking - starting scheduler",
+ instance_id=self.instance_id)
+ await self._start_scheduler()
+
+ async def _on_lose_leader(self):
+ """Called when this instance loses leadership"""
+ logger.warning("Lost leadership for delivery tracking - stopping scheduler",
+ instance_id=self.instance_id)
+ await self._stop_scheduler()
+
+ async def _start_scheduler(self):
+ """Start the APScheduler with delivery tracking jobs"""
+ if self._scheduler_started:
+ logger.debug("Scheduler already started", instance_id=self.instance_id)
+ return
+
if not self.scheduler.running:
# Add hourly job to check deliveries
self.scheduler.add_job(
self._check_all_tenants,
- trigger=CronTrigger(minute=30), # Run every hour at :30 (00:30, 01:30, 02:30, etc.)
+ trigger=CronTrigger(minute=30), # Run every hour at :30
id='hourly_delivery_check',
name='Hourly Delivery Tracking',
replace_existing=True,
- max_instances=1, # Ensure no overlapping runs
- coalesce=True # Combine missed runs
+ max_instances=1,
+ coalesce=True
)
self.scheduler.start()
+ self._scheduler_started = True
- # Log next run time
next_run = self.scheduler.get_job('hourly_delivery_check').next_run_time
- logger.info(
- "Delivery tracking scheduler started with hourly checks",
- instance_id=self.instance_id,
- next_run=next_run.isoformat() if next_run else None
- )
- else:
- logger.info(
- "Delivery tracking scheduler already running",
- instance_id=self.instance_id
- )
+ logger.info("Delivery tracking scheduler started",
+ instance_id=self.instance_id,
+ next_run=next_run.isoformat() if next_run else None)
+
+ async def _stop_scheduler(self):
+ """Stop the APScheduler"""
+ if not self._scheduler_started:
+ return
+
+ if self.scheduler.running:
+ self.scheduler.shutdown(wait=False)
+ self._scheduler_started = False
+ logger.info("Delivery tracking scheduler stopped", instance_id=self.instance_id)
async def stop(self):
- """Stop the scheduler and release leader lock"""
- if self.scheduler.running:
- self.scheduler.shutdown(wait=True) # Graceful shutdown
- logger.info("Delivery tracking scheduler stopped", instance_id=self.instance_id)
- else:
- logger.info("Delivery tracking scheduler already stopped", instance_id=self.instance_id)
+ """Stop the scheduler and leader election"""
+ # Stop leader election first
+ if self._leader_election:
+ await self._leader_election.stop()
+ logger.info("Leader election stopped", instance_id=self.instance_id)
+
+ # Stop scheduler
+ await self._stop_scheduler()
+
+ # Close Redis
+ if self._redis_client:
+ await self._redis_client.close()
+
+ @property
+ def is_leader(self) -> bool:
+ """Check if this instance is the leader"""
+ return self._leader_election.is_leader if self._leader_election else True
async def _check_all_tenants(self):
"""
diff --git a/services/training/app/main.py b/services/training/app/main.py
index 89e76c3b..f40497dd 100644
--- a/services/training/app/main.py
+++ b/services/training/app/main.py
@@ -46,6 +46,9 @@ class TrainingService(StandardFastAPIService):
await setup_messaging()
self.logger.info("Messaging setup completed")
+ # Initialize Redis pub/sub for cross-pod WebSocket broadcasting
+ await self._setup_websocket_redis()
+
# Set up WebSocket event consumer (listens to RabbitMQ and broadcasts to WebSockets)
success = await setup_websocket_event_consumer()
if success:
@@ -53,8 +56,44 @@ class TrainingService(StandardFastAPIService):
else:
self.logger.warning("WebSocket event consumer setup failed")
+ async def _setup_websocket_redis(self):
+ """
+ Initialize Redis pub/sub for WebSocket cross-pod broadcasting.
+
+ CRITICAL FOR HORIZONTAL SCALING:
+ Without this, WebSocket clients on Pod A won't receive events
+ from training jobs running on Pod B.
+ """
+ try:
+ from app.websocket.manager import websocket_manager
+ from app.core.config import settings
+
+ redis_url = settings.REDIS_URL
+ success = await websocket_manager.initialize_redis(redis_url)
+
+ if success:
+ self.logger.info("WebSocket Redis pub/sub initialized for horizontal scaling")
+ else:
+ self.logger.warning(
+ "WebSocket Redis pub/sub failed to initialize. "
+ "WebSocket events will only be delivered to local connections."
+ )
+
+ except Exception as e:
+ self.logger.error("Failed to setup WebSocket Redis pub/sub",
+ error=str(e))
+ # Don't fail startup - WebSockets will work locally without Redis
+
async def _cleanup_messaging(self):
"""Cleanup messaging for training service"""
+ # Shutdown WebSocket Redis pub/sub
+ try:
+ from app.websocket.manager import websocket_manager
+ await websocket_manager.shutdown()
+ self.logger.info("WebSocket Redis pub/sub shutdown completed")
+ except Exception as e:
+ self.logger.warning("Error shutting down WebSocket Redis", error=str(e))
+
await cleanup_websocket_consumers()
await cleanup_messaging()
@@ -78,13 +117,49 @@ class TrainingService(StandardFastAPIService):
async def on_startup(self, app: FastAPI):
"""Custom startup logic including migration verification"""
await self.verify_migrations()
-
+
# Initialize system metrics collection
system_metrics = SystemMetricsCollector("training")
self.logger.info("System metrics collection started")
-
+
+ # Recover stale jobs from previous pod crashes
+ # This is important for horizontal scaling - jobs may be left in 'running'
+ # state if a pod crashes. We mark them as failed so they can be retried.
+ await self._recover_stale_jobs()
+
self.logger.info("Training service startup completed")
+ async def _recover_stale_jobs(self):
+ """
+ Recover stale training jobs on startup.
+
+ When a pod crashes mid-training, jobs are left in 'running' or 'pending' state.
+ This method finds jobs that haven't been updated in a while and marks them
+ as failed so users can retry them.
+ """
+ try:
+ from app.repositories.training_log_repository import TrainingLogRepository
+
+ async with self.database_manager.get_session() as session:
+ log_repo = TrainingLogRepository(session)
+
+ # Recover jobs that haven't been updated in 60 minutes
+ # This is conservative - most training jobs complete within 30 minutes
+ recovered = await log_repo.recover_stale_jobs(stale_threshold_minutes=60)
+
+ if recovered:
+ self.logger.warning(
+ "Recovered stale training jobs on startup",
+ recovered_count=len(recovered),
+ job_ids=[j.job_id for j in recovered]
+ )
+ else:
+ self.logger.info("No stale training jobs to recover")
+
+ except Exception as e:
+ # Don't fail startup if recovery fails - just log the error
+ self.logger.error("Failed to recover stale jobs on startup", error=str(e))
+
async def on_shutdown(self, app: FastAPI):
"""Custom shutdown logic for training service"""
await cleanup_training_database()
diff --git a/services/training/app/repositories/training_log_repository.py b/services/training/app/repositories/training_log_repository.py
index eee6aef3..7bae3098 100644
--- a/services/training/app/repositories/training_log_repository.py
+++ b/services/training/app/repositories/training_log_repository.py
@@ -342,4 +342,166 @@ class TrainingLogRepository(TrainingBaseRepository):
logger.error("Failed to get start time",
job_id=job_id,
error=str(e))
- return None
\ No newline at end of file
+ return None
+
+ async def create_job_atomic(
+ self,
+ job_id: str,
+ tenant_id: str,
+ config: Dict[str, Any] = None
+ ) -> tuple[Optional[ModelTrainingLog], bool]:
+ """
+ Atomically create a training job, respecting the unique constraint.
+
+ This method uses INSERT ... ON CONFLICT to handle race conditions
+ when multiple pods try to create a job for the same tenant simultaneously.
+ The database constraint (idx_unique_active_training_per_tenant) ensures
+ only one active job per tenant can exist.
+
+ Args:
+ job_id: Unique job identifier
+ tenant_id: Tenant identifier
+ config: Optional job configuration
+
+ Returns:
+ Tuple of (job, created):
+ - If created: (new_job, True)
+ - If conflict (existing active job): (existing_job, False)
+ - If error: raises DatabaseError
+ """
+ try:
+ # First, try to find an existing active job
+ existing = await self.get_active_jobs(tenant_id=tenant_id)
+ pending = await self.get_logs_by_tenant(tenant_id=tenant_id, status="pending", limit=1)
+
+ if existing or pending:
+ # Return existing job
+ active_job = existing[0] if existing else pending[0]
+ logger.info("Found existing active job, skipping creation",
+ existing_job_id=active_job.job_id,
+ tenant_id=tenant_id,
+ requested_job_id=job_id)
+ return (active_job, False)
+
+ # Try to create the new job
+ # If another pod created one in the meantime, the unique constraint will prevent this
+ log_data = {
+ "job_id": job_id,
+ "tenant_id": tenant_id,
+ "status": "pending",
+ "progress": 0,
+ "current_step": "initializing",
+ "config": config or {}
+ }
+
+ try:
+ new_job = await self.create_training_log(log_data)
+ await self.session.commit()
+ logger.info("Created new training job atomically",
+ job_id=job_id,
+ tenant_id=tenant_id)
+ return (new_job, True)
+ except Exception as create_error:
+ error_str = str(create_error).lower()
+ # Check if this is a unique constraint violation
+ if "unique" in error_str or "duplicate" in error_str or "constraint" in error_str:
+ await self.session.rollback()
+ # Another pod created a job, fetch it
+ logger.info("Unique constraint hit, fetching existing job",
+ tenant_id=tenant_id,
+ requested_job_id=job_id)
+ existing = await self.get_active_jobs(tenant_id=tenant_id)
+ pending = await self.get_logs_by_tenant(tenant_id=tenant_id, status="pending", limit=1)
+ if existing or pending:
+ active_job = existing[0] if existing else pending[0]
+ return (active_job, False)
+ # If still no job found, something went wrong
+ raise DatabaseError(f"Constraint violation but no active job found: {create_error}")
+ else:
+ raise
+
+ except DatabaseError:
+ raise
+ except Exception as e:
+ logger.error("Failed to create job atomically",
+ job_id=job_id,
+ tenant_id=tenant_id,
+ error=str(e))
+ raise DatabaseError(f"Failed to create training job atomically: {str(e)}")
+
+ async def recover_stale_jobs(self, stale_threshold_minutes: int = 60) -> List[ModelTrainingLog]:
+ """
+ Find and mark stale running jobs as failed.
+
+ This is used during service startup to clean up jobs that were
+ running when a pod crashed. With multiple replicas, only stale
+ jobs (not updated recently) should be marked as failed.
+
+ Args:
+ stale_threshold_minutes: Jobs not updated for this long are considered stale
+
+ Returns:
+ List of jobs that were marked as failed
+ """
+ try:
+ stale_cutoff = datetime.now() - timedelta(minutes=stale_threshold_minutes)
+
+ # Find running jobs that haven't been updated recently
+ query = text("""
+ SELECT id, job_id, tenant_id, status, updated_at
+ FROM model_training_logs
+ WHERE status IN ('running', 'pending')
+ AND updated_at < :stale_cutoff
+ """)
+
+ result = await self.session.execute(query, {"stale_cutoff": stale_cutoff})
+ stale_jobs = result.fetchall()
+
+ recovered_jobs = []
+ for row in stale_jobs:
+ try:
+ # Mark as failed
+ update_query = text("""
+ UPDATE model_training_logs
+ SET status = 'failed',
+ error_message = :error_msg,
+ end_time = :end_time,
+ updated_at = :updated_at
+ WHERE id = :id AND status IN ('running', 'pending')
+ """)
+
+ await self.session.execute(update_query, {
+ "id": row.id,
+ "error_msg": f"Job recovered as failed - not updated since {row.updated_at.isoformat()}. Pod may have crashed.",
+ "end_time": datetime.now(),
+ "updated_at": datetime.now()
+ })
+
+ logger.warning("Recovered stale training job",
+ job_id=row.job_id,
+ tenant_id=str(row.tenant_id),
+ last_updated=row.updated_at.isoformat() if row.updated_at else "unknown")
+
+ # Fetch the updated job to return
+ job = await self.get_by_job_id(row.job_id)
+ if job:
+ recovered_jobs.append(job)
+
+ except Exception as job_error:
+ logger.error("Failed to recover individual stale job",
+ job_id=row.job_id,
+ error=str(job_error))
+
+ if recovered_jobs:
+ await self.session.commit()
+ logger.info("Stale job recovery completed",
+ recovered_count=len(recovered_jobs),
+ stale_threshold_minutes=stale_threshold_minutes)
+
+ return recovered_jobs
+
+ except Exception as e:
+ logger.error("Failed to recover stale jobs",
+ error=str(e))
+ await self.session.rollback()
+ return []
\ No newline at end of file
diff --git a/services/training/app/utils/distributed_lock.py b/services/training/app/utils/distributed_lock.py
index c167b006..c63084db 100644
--- a/services/training/app/utils/distributed_lock.py
+++ b/services/training/app/utils/distributed_lock.py
@@ -1,10 +1,16 @@
"""
Distributed Locking Mechanisms
Prevents concurrent training jobs for the same product
+
+HORIZONTAL SCALING FIX:
+- Uses SHA256 for stable hash across all Python processes/pods
+- Python's built-in hash() varies between processes due to hash randomization (Python 3.3+)
+- This ensures all pods compute the same lock ID for the same lock name
"""
import asyncio
import time
+import hashlib
from typing import Optional
import logging
from contextlib import asynccontextmanager
@@ -39,9 +45,20 @@ class DatabaseLock:
self.lock_id = self._hash_lock_name(lock_name)
def _hash_lock_name(self, name: str) -> int:
- """Convert lock name to integer ID for PostgreSQL advisory lock"""
- # Use hash and modulo to get a positive 32-bit integer
- return abs(hash(name)) % (2**31)
+ """
+ Convert lock name to integer ID for PostgreSQL advisory lock.
+
+ CRITICAL: Uses SHA256 for stable hash across all Python processes/pods.
+ Python's built-in hash() varies between processes due to hash randomization
+ (PYTHONHASHSEED, enabled by default since Python 3.3), which would cause
+ different pods to compute different lock IDs for the same lock name,
+ defeating the purpose of distributed locking.
+ """
+ # Use SHA256 for stable, cross-process hash
+ hash_bytes = hashlib.sha256(name.encode('utf-8')).digest()
+ # Take first 4 bytes and convert to positive 31-bit integer
+ # (PostgreSQL advisory locks use bigint, but we use 31-bit for safety)
+ return int.from_bytes(hash_bytes[:4], 'big') % (2**31)
@asynccontextmanager
async def acquire(self, session: AsyncSession):
diff --git a/services/training/app/websocket/manager.py b/services/training/app/websocket/manager.py
index e8e81245..a9a6b89c 100644
--- a/services/training/app/websocket/manager.py
+++ b/services/training/app/websocket/manager.py
@@ -1,21 +1,39 @@
"""
WebSocket Connection Manager for Training Service
Manages WebSocket connections and broadcasts RabbitMQ events to connected clients
+
+HORIZONTAL SCALING:
+- Uses Redis pub/sub for cross-pod WebSocket broadcasting
+- Each pod subscribes to a Redis channel and broadcasts to its local connections
+- Events published to Redis are received by all pods, ensuring clients on any
+ pod receive events from training jobs running on any other pod
"""
import asyncio
import json
-from typing import Dict, Set
+import os
+from typing import Dict, Optional
from fastapi import WebSocket
import structlog
logger = structlog.get_logger()
+# Redis pub/sub channel for WebSocket events
+REDIS_WEBSOCKET_CHANNEL = "training:websocket:events"
+
class WebSocketConnectionManager:
"""
- Simple WebSocket connection manager.
- Manages connections per job_id and broadcasts messages to all connected clients.
+ WebSocket connection manager with Redis pub/sub for horizontal scaling.
+
+ In a multi-pod deployment:
+ 1. Events are published to Redis pub/sub (not just local broadcast)
+ 2. Each pod subscribes to Redis and broadcasts to its local WebSocket connections
+ 3. This ensures clients connected to any pod receive events from any pod
+
+ Flow:
+ - RabbitMQ event → Pod A receives → Pod A publishes to Redis
+ - Redis pub/sub → All pods receive → Each pod broadcasts to local WebSockets
"""
def __init__(self):
@@ -24,6 +42,121 @@ class WebSocketConnectionManager:
self._lock = asyncio.Lock()
# Store latest event for each job to provide initial state
self._latest_events: Dict[str, dict] = {}
+ # Redis client for pub/sub
+ self._redis: Optional[object] = None
+ self._pubsub: Optional[object] = None
+ self._subscriber_task: Optional[asyncio.Task] = None
+ self._running = False
+ self._instance_id = f"{os.environ.get('HOSTNAME', 'unknown')}:{os.getpid()}"
+
+ async def initialize_redis(self, redis_url: str) -> bool:
+ """
+ Initialize Redis connection for cross-pod pub/sub.
+
+ Args:
+ redis_url: Redis connection URL
+
+ Returns:
+ True if successful, False otherwise
+ """
+ try:
+ import redis.asyncio as redis_async
+
+ self._redis = redis_async.from_url(redis_url, decode_responses=True)
+ await self._redis.ping()
+
+ # Create pub/sub subscriber
+ self._pubsub = self._redis.pubsub()
+ await self._pubsub.subscribe(REDIS_WEBSOCKET_CHANNEL)
+
+ # Start subscriber task
+ self._running = True
+ self._subscriber_task = asyncio.create_task(self._redis_subscriber_loop())
+
+ logger.info("Redis pub/sub initialized for WebSocket broadcasting",
+ instance_id=self._instance_id,
+ channel=REDIS_WEBSOCKET_CHANNEL)
+ return True
+
+ except Exception as e:
+ logger.error("Failed to initialize Redis pub/sub",
+ error=str(e),
+ instance_id=self._instance_id)
+ return False
+
+ async def shutdown(self):
+ """Shutdown Redis pub/sub connection"""
+ self._running = False
+
+ if self._subscriber_task:
+ self._subscriber_task.cancel()
+ try:
+ await self._subscriber_task
+ except asyncio.CancelledError:
+ pass
+
+ if self._pubsub:
+ await self._pubsub.unsubscribe(REDIS_WEBSOCKET_CHANNEL)
+ await self._pubsub.close()
+
+ if self._redis:
+ await self._redis.close()
+
+ logger.info("Redis pub/sub shutdown complete",
+ instance_id=self._instance_id)
+
+ async def _redis_subscriber_loop(self):
+ """Background task to receive Redis pub/sub messages and broadcast locally"""
+ try:
+ while self._running:
+ try:
+ message = await self._pubsub.get_message(
+ ignore_subscribe_messages=True,
+ timeout=1.0
+ )
+
+ if message and message['type'] == 'message':
+ await self._handle_redis_message(message['data'])
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error("Error in Redis subscriber loop",
+ error=str(e),
+ instance_id=self._instance_id)
+ await asyncio.sleep(1) # Backoff on error
+
+ except asyncio.CancelledError:
+ pass
+
+ logger.info("Redis subscriber loop stopped",
+ instance_id=self._instance_id)
+
+ async def _handle_redis_message(self, data: str):
+ """Handle a message received from Redis pub/sub"""
+ try:
+ payload = json.loads(data)
+ job_id = payload.get('job_id')
+ message = payload.get('message')
+ source_instance = payload.get('source_instance')
+
+ if not job_id or not message:
+ return
+
+ # Log cross-pod message
+ if source_instance != self._instance_id:
+ logger.debug("Received cross-pod WebSocket event",
+ job_id=job_id,
+ source_instance=source_instance,
+ local_instance=self._instance_id)
+
+ # Broadcast to local WebSocket connections
+ await self._broadcast_local(job_id, message)
+
+ except json.JSONDecodeError as e:
+ logger.warning("Invalid JSON in Redis message", error=str(e))
+ except Exception as e:
+ logger.error("Error handling Redis message", error=str(e))
async def connect(self, job_id: str, websocket: WebSocket) -> None:
"""Register a new WebSocket connection for a job"""
@@ -50,7 +183,8 @@ class WebSocketConnectionManager:
logger.info("WebSocket connected",
job_id=job_id,
websocket_id=ws_id,
- total_connections=len(self._connections[job_id]))
+ total_connections=len(self._connections[job_id]),
+ instance_id=self._instance_id)
async def disconnect(self, job_id: str, websocket: WebSocket) -> None:
"""Remove a WebSocket connection"""
@@ -66,19 +200,56 @@ class WebSocketConnectionManager:
logger.info("WebSocket disconnected",
job_id=job_id,
websocket_id=ws_id,
- remaining_connections=len(self._connections.get(job_id, {})))
+ remaining_connections=len(self._connections.get(job_id, {})),
+ instance_id=self._instance_id)
async def broadcast(self, job_id: str, message: dict) -> int:
"""
- Broadcast a message to all connections for a specific job.
- Returns the number of successful broadcasts.
+ Broadcast a message to all connections for a specific job across ALL pods.
+
+ If Redis is configured, publishes to Redis pub/sub which then broadcasts
+ to all pods. Otherwise, falls back to local-only broadcast.
+
+ Returns the number of successful local broadcasts.
"""
# Store the latest event for this job to provide initial state to new connections
- if message.get('type') != 'initial_state': # Don't store initial_state messages
+ if message.get('type') != 'initial_state':
self._latest_events[job_id] = message
+ # If Redis is available, publish to Redis for cross-pod broadcast
+ if self._redis:
+ try:
+ payload = json.dumps({
+ 'job_id': job_id,
+ 'message': message,
+ 'source_instance': self._instance_id
+ })
+ await self._redis.publish(REDIS_WEBSOCKET_CHANNEL, payload)
+ logger.debug("Published WebSocket event to Redis",
+ job_id=job_id,
+ message_type=message.get('type'),
+ instance_id=self._instance_id)
+ # Return 0 here because the actual broadcast happens via subscriber
+ # The count will be from _broadcast_local when the message is received
+ return 0
+ except Exception as e:
+ logger.warning("Failed to publish to Redis, falling back to local broadcast",
+ error=str(e),
+ job_id=job_id)
+ # Fall through to local broadcast
+
+ # Local-only broadcast (when Redis is not available)
+ return await self._broadcast_local(job_id, message)
+
+ async def _broadcast_local(self, job_id: str, message: dict) -> int:
+ """
+ Broadcast a message to local WebSocket connections only.
+ This is called either directly (no Redis) or from Redis subscriber.
+ """
if job_id not in self._connections:
- logger.debug("No active connections for job", job_id=job_id)
+ logger.debug("No active local connections for job",
+ job_id=job_id,
+ instance_id=self._instance_id)
return 0
connections = list(self._connections[job_id].values())
@@ -103,18 +274,27 @@ class WebSocketConnectionManager:
self._connections[job_id].pop(ws_id, None)
if successful_sends > 0:
- logger.info("Broadcasted message to WebSocket clients",
+ logger.info("Broadcasted message to local WebSocket clients",
job_id=job_id,
message_type=message.get('type'),
successful_sends=successful_sends,
- failed_sends=len(failed_websockets))
+ failed_sends=len(failed_websockets),
+ instance_id=self._instance_id)
return successful_sends
def get_connection_count(self, job_id: str) -> int:
- """Get the number of active connections for a job"""
+ """Get the number of active local connections for a job"""
return len(self._connections.get(job_id, {}))
+ def get_total_connection_count(self) -> int:
+ """Get total number of active connections across all jobs"""
+ return sum(len(conns) for conns in self._connections.values())
+
+ def is_redis_enabled(self) -> bool:
+ """Check if Redis pub/sub is enabled"""
+ return self._redis is not None and self._running
+
# Global singleton instance
websocket_manager = WebSocketConnectionManager()
diff --git a/services/training/migrations/versions/add_horizontal_scaling_constraints.py b/services/training/migrations/versions/add_horizontal_scaling_constraints.py
new file mode 100644
index 00000000..25cf1696
--- /dev/null
+++ b/services/training/migrations/versions/add_horizontal_scaling_constraints.py
@@ -0,0 +1,60 @@
+"""Add horizontal scaling constraints for multi-pod deployment
+
+Revision ID: add_horizontal_scaling
+Revises: 26a665cd5348
+Create Date: 2025-01-18
+
+This migration adds database-level constraints to prevent race conditions
+when running multiple training service pods:
+
+1. Partial unique index on model_training_logs to prevent duplicate active jobs per tenant
+2. Index to speed up active job lookups
+"""
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision: str = 'add_horizontal_scaling'
+down_revision: Union[str, None] = '26a665cd5348'
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ # Add partial unique index to prevent duplicate active training jobs per tenant
+ # This ensures only ONE job can be in 'pending' or 'running' status per tenant at a time
+ # The constraint is enforced at the database level, preventing race conditions
+ # between multiple pods checking and creating jobs simultaneously
+ op.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_active_training_per_tenant
+ ON model_training_logs (tenant_id)
+ WHERE status IN ('pending', 'running')
+ """)
+
+ # Add index to speed up active job lookups (used by deduplication check)
+ op.create_index(
+ 'idx_training_logs_tenant_status',
+ 'model_training_logs',
+ ['tenant_id', 'status'],
+ unique=False,
+ if_not_exists=True
+ )
+
+ # Add index for job recovery queries (find stale running jobs)
+ op.create_index(
+ 'idx_training_logs_status_updated',
+ 'model_training_logs',
+ ['status', 'updated_at'],
+ unique=False,
+ if_not_exists=True
+ )
+
+
+def downgrade() -> None:
+ # Remove the indexes in reverse order
+ op.execute("DROP INDEX IF EXISTS idx_training_logs_status_updated")
+ op.execute("DROP INDEX IF EXISTS idx_training_logs_tenant_status")
+ op.execute("DROP INDEX IF EXISTS idx_unique_active_training_per_tenant")
diff --git a/shared/leader_election/__init__.py b/shared/leader_election/__init__.py
new file mode 100644
index 00000000..014a4a39
--- /dev/null
+++ b/shared/leader_election/__init__.py
@@ -0,0 +1,33 @@
+"""
+Shared Leader Election for Bakery-IA platform
+
+Provides Redis-based leader election for services that need to run
+singleton scheduled tasks (APScheduler, background jobs, etc.)
+
+Usage:
+ from shared.leader_election import LeaderElectionService, SchedulerLeaderMixin
+
+ # Option 1: Direct usage
+ leader_election = LeaderElectionService(redis_client, "my-service")
+ await leader_election.start(
+ on_become_leader=start_scheduler,
+ on_lose_leader=stop_scheduler
+ )
+
+ # Option 2: Mixin for services with APScheduler
+ class MySchedulerService(SchedulerLeaderMixin):
+ async def _create_scheduler_jobs(self):
+ self.scheduler.add_job(...)
+"""
+
+from shared.leader_election.service import (
+ LeaderElectionService,
+ LeaderElectionConfig,
+)
+from shared.leader_election.mixin import SchedulerLeaderMixin
+
+__all__ = [
+ "LeaderElectionService",
+ "LeaderElectionConfig",
+ "SchedulerLeaderMixin",
+]
diff --git a/shared/leader_election/mixin.py b/shared/leader_election/mixin.py
new file mode 100644
index 00000000..52c1555d
--- /dev/null
+++ b/shared/leader_election/mixin.py
@@ -0,0 +1,209 @@
+"""
+Scheduler Leader Mixin
+
+Provides a mixin class for services that use APScheduler and need
+leader election for horizontal scaling.
+
+Usage:
+ class MySchedulerService(SchedulerLeaderMixin):
+ def __init__(self, redis_url: str, service_name: str):
+ super().__init__(redis_url, service_name)
+ # Your initialization here
+
+ async def _create_scheduler_jobs(self):
+ '''Override to define your scheduled jobs'''
+ self.scheduler.add_job(
+ self.my_job,
+ trigger=CronTrigger(hour=0),
+ id='my_job'
+ )
+
+ async def my_job(self):
+ # Your job logic here
+ pass
+"""
+
+import asyncio
+from typing import Optional
+from abc import abstractmethod
+import structlog
+
+logger = structlog.get_logger()
+
+
+class SchedulerLeaderMixin:
+ """
+ Mixin for services that use APScheduler with leader election.
+
+ Provides automatic leader election and scheduler management.
+ Only the leader pod will run scheduled jobs.
+ """
+
+ def __init__(self, redis_url: str, service_name: str, **kwargs):
+ """
+ Initialize the scheduler with leader election.
+
+ Args:
+ redis_url: Redis connection URL for leader election
+ service_name: Unique service name for leader election lock
+ **kwargs: Additional arguments passed to parent class
+ """
+ super().__init__(**kwargs)
+
+ self._redis_url = redis_url
+ self._service_name = service_name
+ self._leader_election = None
+ self._redis_client = None
+ self.scheduler = None
+ self._scheduler_started = False
+
+ async def start_with_leader_election(self):
+ """
+ Start the service with leader election.
+
+ Only the leader will start the scheduler.
+ """
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
+ from shared.leader_election.service import LeaderElectionService
+ import redis.asyncio as redis
+
+ try:
+ # Create Redis connection
+ self._redis_client = redis.from_url(self._redis_url, decode_responses=False)
+ await self._redis_client.ping()
+
+ # Create scheduler (but don't start it yet)
+ self.scheduler = AsyncIOScheduler()
+
+ # Create leader election
+ self._leader_election = LeaderElectionService(
+ self._redis_client,
+ self._service_name
+ )
+
+ # Start leader election with callbacks
+ await self._leader_election.start(
+ on_become_leader=self._on_become_leader,
+ on_lose_leader=self._on_lose_leader
+ )
+
+ logger.info("Scheduler service started with leader election",
+ service=self._service_name,
+ is_leader=self._leader_election.is_leader,
+ instance_id=self._leader_election.instance_id)
+
+ except Exception as e:
+ logger.error("Failed to start with leader election, falling back to standalone",
+ service=self._service_name,
+ error=str(e))
+ # Fallback: start scheduler anyway (for single-pod deployments)
+ await self._start_scheduler_standalone()
+
+ async def _on_become_leader(self):
+ """Called when this instance becomes the leader"""
+ logger.info("Became leader, starting scheduler",
+ service=self._service_name)
+ await self._start_scheduler()
+
+ async def _on_lose_leader(self):
+ """Called when this instance loses leadership"""
+ logger.warning("Lost leadership, stopping scheduler",
+ service=self._service_name)
+ await self._stop_scheduler()
+
+ async def _start_scheduler(self):
+ """Start the scheduler with defined jobs"""
+ if self._scheduler_started:
+ logger.warning("Scheduler already started",
+ service=self._service_name)
+ return
+
+ try:
+ # Let subclass define jobs
+ await self._create_scheduler_jobs()
+
+ # Start scheduler
+ if not self.scheduler.running:
+ self.scheduler.start()
+ self._scheduler_started = True
+ logger.info("Scheduler started",
+ service=self._service_name,
+ job_count=len(self.scheduler.get_jobs()))
+
+ except Exception as e:
+ logger.error("Failed to start scheduler",
+ service=self._service_name,
+ error=str(e))
+
+ async def _stop_scheduler(self):
+ """Stop the scheduler"""
+ if not self._scheduler_started:
+ return
+
+ try:
+ if self.scheduler and self.scheduler.running:
+ self.scheduler.shutdown(wait=False)
+ self._scheduler_started = False
+ logger.info("Scheduler stopped",
+ service=self._service_name)
+
+ except Exception as e:
+ logger.error("Failed to stop scheduler",
+ service=self._service_name,
+ error=str(e))
+
+ async def _start_scheduler_standalone(self):
+ """Start scheduler without leader election (fallback mode)"""
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
+
+ logger.warning("Starting scheduler in standalone mode (no leader election)",
+ service=self._service_name)
+
+ self.scheduler = AsyncIOScheduler()
+ await self._create_scheduler_jobs()
+
+ if not self.scheduler.running:
+ self.scheduler.start()
+ self._scheduler_started = True
+
+ @abstractmethod
+ async def _create_scheduler_jobs(self):
+ """
+ Override to define scheduled jobs.
+
+ Example:
+ self.scheduler.add_job(
+ self.my_task,
+ trigger=CronTrigger(hour=0, minute=30),
+ id='my_task',
+ max_instances=1
+ )
+ """
+ pass
+
+ async def stop(self):
+ """Stop the scheduler and leader election"""
+ # Stop leader election
+ if self._leader_election:
+ await self._leader_election.stop()
+
+ # Stop scheduler
+ await self._stop_scheduler()
+
+ # Close Redis
+ if self._redis_client:
+ await self._redis_client.close()
+
+ logger.info("Scheduler service stopped",
+ service=self._service_name)
+
+ @property
+ def is_leader(self) -> bool:
+ """Check if this instance is the leader"""
+ return self._leader_election.is_leader if self._leader_election else False
+
+ def get_leader_status(self) -> dict:
+ """Get leader election status"""
+ if self._leader_election:
+ return self._leader_election.get_status()
+ return {"is_leader": True, "mode": "standalone"}
diff --git a/shared/leader_election/service.py b/shared/leader_election/service.py
new file mode 100644
index 00000000..cf6d49db
--- /dev/null
+++ b/shared/leader_election/service.py
@@ -0,0 +1,352 @@
+"""
+Leader Election Service
+
+Implements Redis-based leader election to ensure only ONE pod runs
+singleton tasks like APScheduler jobs.
+
+This is CRITICAL for horizontal scaling - without leader election,
+each pod would run the same scheduled jobs, causing:
+- Duplicate operations (forecasts, alerts, syncs)
+- Database contention
+- Inconsistent state
+- Duplicate notifications
+
+Implementation:
+- Uses Redis SET NX (set if not exists) for atomic leadership acquisition
+- Leader maintains leadership with periodic heartbeats
+- If leader fails to heartbeat, another pod can take over
+- Non-leader pods check periodically if they should become leader
+"""
+
+import asyncio
+import os
+import socket
+from dataclasses import dataclass
+from typing import Optional, Callable, Awaitable
+import structlog
+
+logger = structlog.get_logger()
+
+
+@dataclass
+class LeaderElectionConfig:
+ """Configuration for leader election"""
+ # Redis key prefix for the lock
+ lock_key_prefix: str = "leader"
+ # Lock expires after this many seconds without refresh
+ lock_ttl_seconds: int = 30
+ # Refresh lock every N seconds (should be < lock_ttl_seconds / 2)
+ heartbeat_interval_seconds: int = 10
+ # Non-leaders check for leadership every N seconds
+ election_check_interval_seconds: int = 15
+
+
+class LeaderElectionService:
+ """
+ Redis-based leader election service.
+
+ Ensures only one pod runs scheduled tasks at a time across all replicas.
+ """
+
+ def __init__(
+ self,
+ redis_client,
+ service_name: str,
+ config: Optional[LeaderElectionConfig] = None
+ ):
+ """
+ Initialize leader election service.
+
+ Args:
+ redis_client: Async Redis client instance
+ service_name: Unique name for this service (used in Redis key)
+ config: Optional configuration override
+ """
+ self.redis = redis_client
+ self.service_name = service_name
+ self.config = config or LeaderElectionConfig()
+ self.lock_key = f"{self.config.lock_key_prefix}:{service_name}:lock"
+ self.instance_id = self._generate_instance_id()
+ self.is_leader = False
+ self._heartbeat_task: Optional[asyncio.Task] = None
+ self._election_task: Optional[asyncio.Task] = None
+ self._running = False
+ self._on_become_leader_callback: Optional[Callable[[], Awaitable[None]]] = None
+ self._on_lose_leader_callback: Optional[Callable[[], Awaitable[None]]] = None
+
+ def _generate_instance_id(self) -> str:
+ """Generate unique instance identifier for this pod"""
+ hostname = os.environ.get('HOSTNAME', socket.gethostname())
+ pod_ip = os.environ.get('POD_IP', 'unknown')
+ return f"{hostname}:{pod_ip}:{os.getpid()}"
+
+ async def start(
+ self,
+ on_become_leader: Optional[Callable[[], Awaitable[None]]] = None,
+ on_lose_leader: Optional[Callable[[], Awaitable[None]]] = None
+ ):
+ """
+ Start leader election process.
+
+ Args:
+ on_become_leader: Async callback when this instance becomes leader
+ on_lose_leader: Async callback when this instance loses leadership
+ """
+ self._on_become_leader_callback = on_become_leader
+ self._on_lose_leader_callback = on_lose_leader
+ self._running = True
+
+ logger.info("Starting leader election",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ lock_key=self.lock_key)
+
+ # Try to become leader immediately
+ await self._try_become_leader()
+
+ # Start background tasks
+ if self.is_leader:
+ self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
+ else:
+ self._election_task = asyncio.create_task(self._election_loop())
+
+ async def stop(self):
+ """Stop leader election and release leadership if held"""
+ self._running = False
+
+ # Cancel background tasks
+ if self._heartbeat_task:
+ self._heartbeat_task.cancel()
+ try:
+ await self._heartbeat_task
+ except asyncio.CancelledError:
+ pass
+ self._heartbeat_task = None
+
+ if self._election_task:
+ self._election_task.cancel()
+ try:
+ await self._election_task
+ except asyncio.CancelledError:
+ pass
+ self._election_task = None
+
+ # Release leadership
+ if self.is_leader:
+ await self._release_leadership()
+
+ logger.info("Leader election stopped",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ was_leader=self.is_leader)
+
+ async def _try_become_leader(self) -> bool:
+ """
+ Attempt to become the leader.
+
+ Returns:
+ True if this instance is now the leader
+ """
+ try:
+ # Try to set the lock with NX (only if not exists) and EX (expiry)
+ acquired = await self.redis.set(
+ self.lock_key,
+ self.instance_id,
+ nx=True, # Only set if not exists
+ ex=self.config.lock_ttl_seconds
+ )
+
+ if acquired:
+ self.is_leader = True
+ logger.info("Became leader",
+ service=self.service_name,
+ instance_id=self.instance_id)
+
+ # Call callback
+ if self._on_become_leader_callback:
+ try:
+ await self._on_become_leader_callback()
+ except Exception as e:
+ logger.error("Error in on_become_leader callback",
+ service=self.service_name,
+ error=str(e))
+
+ return True
+
+ # Check if we're already the leader (reconnection scenario)
+ current_leader = await self.redis.get(self.lock_key)
+ if current_leader:
+ current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
+ if current_leader_str == self.instance_id:
+ self.is_leader = True
+ logger.info("Confirmed as existing leader",
+ service=self.service_name,
+ instance_id=self.instance_id)
+ return True
+ else:
+ logger.debug("Another instance is leader",
+ service=self.service_name,
+ current_leader=current_leader_str,
+ this_instance=self.instance_id)
+
+ return False
+
+ except Exception as e:
+ logger.error("Failed to acquire leadership",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ error=str(e))
+ return False
+
+ async def _release_leadership(self):
+ """Release leadership lock"""
+ try:
+ # Only delete if we're the current leader
+ current_leader = await self.redis.get(self.lock_key)
+ if current_leader:
+ current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
+ if current_leader_str == self.instance_id:
+ await self.redis.delete(self.lock_key)
+ logger.info("Released leadership",
+ service=self.service_name,
+ instance_id=self.instance_id)
+
+ was_leader = self.is_leader
+ self.is_leader = False
+
+ # Call callback only if we were the leader
+ if was_leader and self._on_lose_leader_callback:
+ try:
+ await self._on_lose_leader_callback()
+ except Exception as e:
+ logger.error("Error in on_lose_leader callback",
+ service=self.service_name,
+ error=str(e))
+
+ except Exception as e:
+ logger.error("Failed to release leadership",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ error=str(e))
+
+ async def _refresh_leadership(self) -> bool:
+ """
+ Refresh leadership lock TTL.
+
+ Returns:
+ True if leadership was maintained
+ """
+ try:
+ # Verify we're still the leader
+ current_leader = await self.redis.get(self.lock_key)
+ if not current_leader:
+ logger.warning("Lost leadership (lock expired)",
+ service=self.service_name,
+ instance_id=self.instance_id)
+ return False
+
+ current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader
+ if current_leader_str != self.instance_id:
+ logger.warning("Lost leadership (lock held by another instance)",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ current_leader=current_leader_str)
+ return False
+
+ # Refresh the TTL
+ await self.redis.expire(self.lock_key, self.config.lock_ttl_seconds)
+ return True
+
+ except Exception as e:
+ logger.error("Failed to refresh leadership",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ error=str(e))
+ return False
+
+ async def _heartbeat_loop(self):
+ """Background loop to maintain leadership"""
+ while self._running and self.is_leader:
+ try:
+ await asyncio.sleep(self.config.heartbeat_interval_seconds)
+
+ if not self._running:
+ break
+
+ maintained = await self._refresh_leadership()
+
+ if not maintained:
+ self.is_leader = False
+
+ # Call callback
+ if self._on_lose_leader_callback:
+ try:
+ await self._on_lose_leader_callback()
+ except Exception as e:
+ logger.error("Error in on_lose_leader callback",
+ service=self.service_name,
+ error=str(e))
+
+ # Switch to election loop
+ self._election_task = asyncio.create_task(self._election_loop())
+ break
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error("Error in heartbeat loop",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ error=str(e))
+
+ async def _election_loop(self):
+ """Background loop to attempt leadership acquisition"""
+ while self._running and not self.is_leader:
+ try:
+ await asyncio.sleep(self.config.election_check_interval_seconds)
+
+ if not self._running:
+ break
+
+ acquired = await self._try_become_leader()
+
+ if acquired:
+ # Switch to heartbeat loop
+ self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
+ break
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error("Error in election loop",
+ service=self.service_name,
+ instance_id=self.instance_id,
+ error=str(e))
+
+ def get_status(self) -> dict:
+ """Get current leader election status"""
+ return {
+ "service": self.service_name,
+ "instance_id": self.instance_id,
+ "is_leader": self.is_leader,
+ "running": self._running,
+ "lock_key": self.lock_key,
+ "config": {
+ "lock_ttl_seconds": self.config.lock_ttl_seconds,
+ "heartbeat_interval_seconds": self.config.heartbeat_interval_seconds,
+ "election_check_interval_seconds": self.config.election_check_interval_seconds
+ }
+ }
+
+ async def get_current_leader(self) -> Optional[str]:
+ """Get the current leader instance ID (if any)"""
+ try:
+ current_leader = await self.redis.get(self.lock_key)
+ if current_leader:
+ return current_leader.decode() if isinstance(current_leader, bytes) else current_leader
+ return None
+ except Exception as e:
+ logger.error("Failed to get current leader",
+ service=self.service_name,
+ error=str(e))
+ return None