From 21d35ea92b6d16af1cf7fa2f192ac611c975d576 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Sun, 18 Jan 2026 09:02:27 +0100 Subject: [PATCH] Add ci/cd and fix multiple pods issues --- CI_CD_IMPLEMENTATION_PLAN.md | 1206 +++++++++++++++++ INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md | 413 ++++++ infrastructure/ci-cd/README.md | 294 ++++ infrastructure/ci-cd/flux/git-repository.yaml | 16 + infrastructure/ci-cd/flux/kustomization.yaml | 27 + infrastructure/ci-cd/gitea/ingress.yaml | 25 + infrastructure/ci-cd/gitea/values.yaml | 38 + .../ci-cd/monitoring/otel-collector.yaml | 70 + .../ci-cd/tekton/pipelines/ci-pipeline.yaml | 83 ++ .../ci-cd/tekton/tasks/detect-changes.yaml | 64 + .../ci-cd/tekton/tasks/git-clone.yaml | 31 + .../ci-cd/tekton/tasks/kaniko-build.yaml | 40 + .../ci-cd/tekton/tasks/update-gitops.yaml | 66 + .../ci-cd/tekton/triggers/event-listener.yaml | 26 + .../tekton/triggers/gitlab-interceptor.yaml | 14 + .../tekton/triggers/trigger-binding.yaml | 16 + .../tekton/triggers/trigger-template.yaml | 43 + services/orchestrator/app/main.py | 129 +- .../app/services/delivery_tracking_service.py | 127 +- services/training/app/main.py | 79 +- .../repositories/training_log_repository.py | 164 ++- .../training/app/utils/distributed_lock.py | 23 +- services/training/app/websocket/manager.py | 204 ++- .../add_horizontal_scaling_constraints.py | 60 + shared/leader_election/__init__.py | 33 + shared/leader_election/mixin.py | 209 +++ shared/leader_election/service.py | 352 +++++ 27 files changed, 3779 insertions(+), 73 deletions(-) create mode 100644 CI_CD_IMPLEMENTATION_PLAN.md create mode 100644 INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md create mode 100644 infrastructure/ci-cd/README.md create mode 100644 infrastructure/ci-cd/flux/git-repository.yaml create mode 100644 infrastructure/ci-cd/flux/kustomization.yaml create mode 100644 infrastructure/ci-cd/gitea/ingress.yaml create mode 100644 infrastructure/ci-cd/gitea/values.yaml create mode 100644 infrastructure/ci-cd/monitoring/otel-collector.yaml create mode 100644 infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml create mode 100644 infrastructure/ci-cd/tekton/tasks/detect-changes.yaml create mode 100644 infrastructure/ci-cd/tekton/tasks/git-clone.yaml create mode 100644 infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml create mode 100644 infrastructure/ci-cd/tekton/tasks/update-gitops.yaml create mode 100644 infrastructure/ci-cd/tekton/triggers/event-listener.yaml create mode 100644 infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml create mode 100644 infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml create mode 100644 infrastructure/ci-cd/tekton/triggers/trigger-template.yaml create mode 100644 services/training/migrations/versions/add_horizontal_scaling_constraints.py create mode 100644 shared/leader_election/__init__.py create mode 100644 shared/leader_election/mixin.py create mode 100644 shared/leader_election/service.py diff --git a/CI_CD_IMPLEMENTATION_PLAN.md b/CI_CD_IMPLEMENTATION_PLAN.md new file mode 100644 index 00000000..2cf75201 --- /dev/null +++ b/CI_CD_IMPLEMENTATION_PLAN.md @@ -0,0 +1,1206 @@ +# Bakery-IA Production CI/CD Implementation Plan + +## Document Overview +**Status**: Draft +**Version**: 1.0 +**Date**: 2024-07-15 +**Author**: Mistral Vibe + +This document outlines the production-grade CI/CD architecture for Bakery-IA and provides a step-by-step implementation plan without requiring immediate code changes. + +## Table of Contents +1. [Current State Analysis](#current-state-analysis) +2. [Target Architecture](#target-architecture) +3. [Implementation Strategy](#implementation-strategy) +4. [Phase 1: Infrastructure Setup](#phase-1-infrastructure-setup) +5. [Phase 2: CI/CD Pipeline Configuration](#phase-2-cicd-pipeline-configuration) +6. [Phase 3: Monitoring and Observability](#phase-3-monitoring-and-observability) +7. [Phase 4: Testing and Validation](#phase-4-testing-and-validation) +8. [Phase 5: Rollout and Migration](#phase-5-rollback-and-migration) +9. [Risk Assessment](#risk-assessment) +10. [Success Metrics](#success-metrics) +11. [Appendices](#appendices) + +--- + +## Current State Analysis + +### Existing Infrastructure +- **Microservices**: 19 services in `services/` directory +- **Frontend**: React application in `frontend/` +- **Gateway**: API gateway in `gateway/` +- **Databases**: 22 PostgreSQL instances + Redis + RabbitMQ +- **Storage**: MinIO for object storage +- **Monitoring**: SigNoz already deployed +- **Target Platform**: MicroK8s on Clouding.io VPS + +### Current Deployment Process +- Manual builds using Tiltfile/Skaffold (local only) +- Manual image pushes to local registry or Docker Hub +- Manual kubectl apply commands +- No automated testing gates +- No rollback mechanism + +### Pain Points +- "Works on my machine" issues +- No audit trail of deployments +- Time-consuming manual processes +- Risk of human error +- No automated testing in pipeline + +--- + +## Target Architecture + +### High-Level Architecture Diagram + +```mermaid +graph TD + A[Developer Workstation] -->|Push Code| B[Gitea Git Server] + B -->|Webhook| C[Tekton Pipelines] + C -->|Build/Test| D[Gitea Container Registry] + D -->|New Image| E[Flux CD] + E -->|Git Commit| B + E -->|kubectl apply| F[MicroK8s Cluster] + F -->|Metrics/Logs| G[SigNoz Monitoring] +``` + +### How CI/CD Tools Run in Kubernetes +Yes, they are individual container images running as pods in your MicroK8s cluster, just like your application services. + +```mermaid +graph TB + subgraph "MicroK8s Cluster (Your VPS)" + subgraph "Namespace: gitea" + A1[Pod: gitea
Image: gitea/gitea:latest] + A2[Pod: gitea-postgresql
Image: postgres:15] + A3[PVC: gitea-data] + end + + subgraph "Namespace: tekton-pipelines" + B1[Pod: tekton-pipelines-controller
Image: gcr.io/tekton-releases/...] + B2[Pod: tekton-pipelines-webhook
Image: gcr.io/tekton-releases/...] + B3[Pod: tekton-triggers-controller
Image: gcr.io/tekton-releases/...] + end + + subgraph "Namespace: flux-system" + C1[Pod: source-controller
Image: ghcr.io/fluxcd/...] + C2[Pod: kustomize-controller
Image: ghcr.io/fluxcd/...] + C3[Pod: helm-controller
Image: ghcr.io/fluxcd/...] + end + + subgraph "Namespace: bakery-ia (YOUR APP)" + D1[19 services + 22 databases + Redis + RabbitMQ + MinIO] + end + end +``` + +### Component Breakdown + +#### 1. Gitea (Git Server + Registry) +- **Purpose**: Replace GitHub dependency +- **Namespace**: `gitea` +- **Resources**: ~768MB RAM (512MB Gitea + 256MB PostgreSQL) +- **Storage**: PVC for repositories and registry +- **Access**: Internal DNS `gitea.bakery-ia.local` +- **LeaderElectionService**: Gitea handles leader election internally for high availability scenarios + +#### 2. Tekton (CI Pipelines) +- **Purpose**: Build, test, and push container images +- **Namespace**: `tekton-pipelines` +- **Resources**: ~650MB baseline + 512MB per build +- **Key Features**: + - Path-based change detection + - Parallel builds for independent services + - Kaniko for in-cluster image building + - Integration with Gitea registry +- **LeaderElectionService**: Tekton controllers use leader election to ensure high availability + +#### 3. Flux CD (GitOps Deployment) +- **Purpose**: Automated deployments from Git +- **Namespace**: `flux-system` +- **Resources**: ~230MB baseline +- **Key Features**: + - Pull-based deployments (no webhooks needed) + - Kustomize support for your existing overlays + - Image automation for rolling updates + - Drift detection and correction +- **LeaderElectionService**: Flux controllers use leader election to ensure only one active controller + +#### 4. SigNoz (Monitoring) +- **Purpose**: Observability for CI/CD and applications +- **Integration Points**: + - Tekton pipeline metrics + - Flux reconciliation events + - Kubernetes resource metrics + - Application performance monitoring + +### Deployment Methods for Each Tool + +#### 1. Flux (Easiest - Built into MicroK8s) + +```bash +# One command - MicroK8s has it built-in +microk8s enable fluxcd + +# This creates: +# - Namespace: flux-system +# - Deployments: source-controller, kustomize-controller, helm-controller, notification-controller +# - CRDs: GitRepository, Kustomization, HelmRelease, etc. +``` + +**Images pulled:** +- `ghcr.io/fluxcd/source-controller:v1.x.x` +- `ghcr.io/fluxcd/kustomize-controller:v1.x.x` +- `ghcr.io/fluxcd/helm-controller:v0.x.x` +- `ghcr.io/fluxcd/notification-controller:v1.x.x` + +#### 2. Tekton (kubectl apply or Helm) + +```bash +# Option A: Direct apply (official releases) +kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml +kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml +kubectl apply -f https://storage.googleapis.com/tekton-releases/dashboard/latest/release.yaml + +# Option B: Helm chart +helm repo add tekton https://tekton.dev/charts +helm install tekton-pipelines tekton/tekton-pipelines -n tekton-pipelines --create-namespace +``` + +**Images pulled:** +- `gcr.io/tekton-releases/github.com/tektoncd/pipeline/cmd/controller:v0.x.x` +- `gcr.io/tekton-releases/github.com/tektoncd/pipeline/cmd/webhook:v0.x.x` +- `gcr.io/tekton-releases/github.com/tektoncd/triggers/cmd/controller:v0.x.x` +- `gcr.io/tekton-releases/github.com/tektoncd/dashboard/cmd/dashboard:v0.x.x` + +#### 3. Gitea (Helm chart) + +```bash +# Add Helm repo +helm repo add gitea https://dl.gitea.io/charts + +# Install with custom values +helm install gitea gitea/gitea \ + -n gitea --create-namespace \ + -f gitea-values.yaml +``` + +**Images pulled:** +- `gitea/gitea:1.x.x` +- `postgres:15-alpine` (or bundled) + +--- + +## Complete Deployment Architecture + +```mermaid +graph TB + subgraph "Your Git Repository
(Initially in GitHub, then Gitea)" + A[bakery-ia/
├── services/
├── frontend/
├── gateway/
├── infrastructure/
│ ├── kubernetes/
│ │ ├── base/
│ │ └── overlays/
│ │ ├── dev/
│ │ └── prod/
│ └── ci-cd/
│ ├── gitea/
│ ├── tekton/
│ └── flux/
└── tekton/
└── pipeline.yaml] + end + + A --> B[Gitea
Self-hosted Git
Stores code
Triggers webhook] + + B --> C[Tekton
EventListener
TriggerTemplate
PipelineRun] + + C --> D[Pipeline Steps
├── clone
├── detect changes
├── test
├── build
└── push] + + D --> E[Gitea Registry
gitea:5000/bakery/
auth-service:abc123] + + E --> F[Flux
source-controller
kustomize-controller
kubectl apply] + + F --> G[Your Application
bakery-ia namespace
Updated services] +``` + +### Guiding Principles +1. **No Code Changes Required**: Use existing codebase as-is +2. **Incremental Rollout**: Phase-based implementation +3. **Zero Downtime**: Parallel run with existing manual process +4. **Observability First**: Monitor before automating +5. **Security by Design**: Secrets management from day one + +### Implementation Phases + +```mermaid +gantt + title CI/CD Implementation Timeline + dateFormat YYYY-MM-DD + section Phase 1: Infrastructure + Infrastructure Setup :a1, 2024-07-15, 7d + section Phase 2: CI/CD Config + Pipeline Configuration :a2, 2024-07-22, 10d + section Phase 3: Monitoring + SigNoz Integration :a3, 2024-08-01, 5d + section Phase 4: Testing + Validation Testing :a4, 2024-08-06, 7d + section Phase 5: Rollout + Production Migration :a5, 2024-08-13, 5d +``` + +## Step-by-Step: How to Deploy CI/CD to Production + +### Phase 1: Bootstrap (One-time setup on VPS) + +```bash +# SSH to your VPS +ssh user@your-clouding-vps + +# 1. Enable Flux (built into MicroK8s) +microk8s enable fluxcd + +# 2. Install Tekton +microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml +microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml + +# 3. Install Gitea via Helm +microk8s helm repo add gitea https://dl.gitea.io/charts +microk8s helm install gitea gitea/gitea -n gitea --create-namespace -f gitea-values.yaml + +# 4. Verify all running +microk8s kubectl get pods -A | grep -E "gitea|tekton|flux" +``` + +After this, you have: + +``` +NAMESPACE NAME READY STATUS +gitea gitea-0 1/1 Running +gitea gitea-postgresql-0 1/1 Running +tekton-pipelines tekton-pipelines-controller-xxx 1/1 Running +tekton-pipelines tekton-pipelines-webhook-xxx 1/1 Running +tekton-pipelines tekton-triggers-controller-xxx 1/1 Running +flux-system source-controller-xxx 1/1 Running +flux-system kustomize-controller-xxx 1/1 Running +flux-system helm-controller-xxx 1/1 Running +``` + +### Phase 2: Configure Flux to Watch Your Repo + +```yaml +# infrastructure/ci-cd/flux/gitrepository.yaml +apiVersion: source.toolkit.fluxcd.io/v1 +kind: GitRepository +metadata: + name: bakery-ia + namespace: flux-system +spec: + interval: 1m + url: https://gitea.bakery-ia.local/bakery/bakery-ia.git + ref: + branch: main + secretRef: + name: gitea-credentials # Git credentials +--- +# infrastructure/ci-cd/flux/kustomization.yaml +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: bakery-ia-prod + namespace: flux-system +spec: + interval: 5m + path: ./infrastructure/kubernetes/overlays/prod + prune: true + sourceRef: + kind: GitRepository + name: bakery-ia + targetNamespace: bakery-ia +``` + +### Phase 3: Configure Tekton Pipeline + +```yaml +# tekton/pipeline.yaml +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + name: bakery-ia-ci + namespace: tekton-pipelines +spec: + params: + - name: git-url + - name: git-revision + - name: changed-services + type: array + + workspaces: + - name: source + - name: docker-credentials + + tasks: + - name: clone + taskRef: + name: git-clone + workspaces: + - name: output + workspace: source + params: + - name: url + value: $(params.git-url) + - name: revision + value: $(params.git-revision) + + - name: detect-changes + runAfter: [clone] + taskRef: + name: detect-changed-services + workspaces: + - name: source + workspace: source + + - name: build-and-push + runAfter: [detect-changes] + taskRef: + name: kaniko-build + params: + - name: services + value: $(tasks.detect-changes.results.changed-services) + workspaces: + - name: source + workspace: source + - name: docker-credentials + workspace: docker-credentials +``` + +## Visual: Complete Production Flow + +```mermaid +graph LR + A[Developer pushes code] --> B[Gitea
Self-hosted Git
• Receives push
• Stores code
• Triggers webhook] + + B -->|webhook POST to tekton-triggers| C[Tekton
EventListener
TriggerTemplate
PipelineRun] + + C --> D[Pipeline Steps
Each step = container in pod:
├── clone
├── detect changes
├── test (pytest)
├── build (kaniko)
└── push (registry)] + + D --> E[Only changed services] + + D --> F[Gitea Registry
gitea:5000/bakery/
auth-service:abc123] + + F -->|Final step: Update image tag in Git
commits new tag to infrastructure/kubernetes/overlays/prod| G[Git commit triggers Flux] + + G --> H[Flux
source-controller
kustomize-controller
kubectl apply
• Detects new image tag in Git
• Renders Kustomize overlay
• Applies to bakery-ia namespace
• Rolling update of changed services] + + H --> I[Your Application
Namespace: bakery-ia
├── auth-service:abc123 ←NEW
├── tenant-svc:def456
└── training-svc:ghi789
Only auth-service was updated (others unchanged)] +``` + +## Where Images Come From + +| Component | Image Source | Notes | +|-----------|--------------|-------| +| Flux | ghcr.io/fluxcd/* | Pulled once, cached locally | +| Tekton | gcr.io/tekton-releases/* | Pulled once, cached locally | +| Gitea | gitea/gitea (Docker Hub) | Pulled once, cached locally | +| Your Services | gitea.local:5000/bakery/* | Built by Tekton, stored in Gitea registry | +| Build Tools | gcr.io/kaniko-project/executor | Used during builds only | + +## Summary: What Lives Where + +```mermaid +graph TB + subgraph "MicroK8s Cluster" + subgraph "Namespace: gitea (CI/CD Infrastructure)
~768MB total" + A1[gitea pod ~512MB RAM] + A2[postgresql pod ~256MB RAM] + end + + subgraph "Namespace: tekton-pipelines (CI/CD Infrastructure)
~650MB baseline" + B1[pipelines-controller ~200MB RAM] + B2[pipelines-webhook ~100MB RAM] + B3[triggers-controller ~150MB RAM] + B4[triggers-webhook ~100MB RAM] + end + + subgraph "Namespace: flux-system (CI/CD Infrastructure)
~230MB baseline" + C1[source-controller ~50MB RAM] + C2[kustomize-controller ~50MB RAM] + C3[helm-controller ~50MB RAM] + C4[notification-controller ~30MB RAM] + end + + subgraph "Namespace: bakery-ia (YOUR APPLICATION)" + D1[19 microservices] + D2[22 PostgreSQL databases] + D3[Redis] + D4[RabbitMQ] + D5[MinIO] + end + end + + note1["CI/CD Total: ~1.5GB baseline"] + note2["During builds: +512MB per concurrent build (Tekton spawns pods)"] +``` + +### Key Points +- Everything runs as pods - Gitea, Tekton, Flux are all containerized +- Pulled from public registries once - then cached on your VPS +- Your app images stay local - built by Tekton, stored in Gitea registry +- No external dependencies after setup - fully self-contained +- Flux pulls from Git - no incoming webhooks needed for deployments + +--- + +## Phase 1: Infrastructure Setup + +### Objective +Deploy CI/CD infrastructure components without affecting existing applications. + +### Step-by-Step Implementation + +#### Step 1: Prepare MicroK8s Cluster +```bash +# SSH to VPS +ssh admin@bakery-ia-vps + +# Verify MicroK8s status +microk8s status + +# Enable required addons +microk8s enable dns storage ingress fluxcd + +# Verify storage class +microk8s kubectl get storageclass +``` + +#### Step 2: Deploy Gitea + +**Create Gitea values file** (`infrastructure/ci-cd/gitea/values.yaml`): +```yaml +service: + type: ClusterIP + httpPort: 3000 + sshPort: 2222 + +persistence: + enabled: true + size: 50Gi + storageClass: "microk8s-hostpath" + +gitea: + config: + server: + DOMAIN: gitea.bakery-ia.local + SSH_DOMAIN: gitea.bakery-ia.local + ROOT_URL: http://gitea.bakery-ia.local + repository: + ENABLE_PUSH_CREATE_USER: true + ENABLE_PUSH_CREATE_ORG: true + registry: + ENABLED: true + +postgresql: + enabled: true + persistence: + size: 20Gi +``` + +**Deploy Gitea**: +```bash +# Add Helm repo +microk8s helm repo add gitea https://dl.gitea.io/charts + +# Create namespace +microk8s kubectl create namespace gitea + +# Install Gitea +microk8s helm install gitea gitea/gitea \ + -n gitea \ + -f infrastructure/ci-cd/gitea/values.yaml +``` + +**Verify Deployment**: +```bash +# Check pods +microk8s kubectl get pods -n gitea + +# Get admin password +microk8s kubectl get secret -n gitea gitea-admin-secret -o jsonpath='{.data.password}' | base64 -d +``` + +#### Step 3: Configure Ingress for Gitea + +**Create Ingress Resource** (`infrastructure/ci-cd/gitea/ingress.yaml`): +```yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: gitea-ingress + namespace: gitea + annotations: + nginx.ingress.kubernetes.io/rewrite-target: / +spec: + rules: + - host: gitea.bakery-ia.local + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: gitea-http + port: + number: 3000 +``` + +**Apply Ingress**: +```bash +microk8s kubectl apply -f infrastructure/ci-cd/gitea/ingress.yaml +``` + +#### Step 4: Migrate Repository from GitHub + +**Manual Migration Steps**: +1. Create new repository in Gitea UI +2. Use git mirror to push existing repo: +```bash +# Clone bare repo from GitHub +git clone --bare git@github.com:your-org/bakery-ia.git + +# Push to Gitea +cd bakery-ia.git +git push --mirror http://admin:PASSWORD@gitea.bakery-ia.local/your-org/bakery-ia.git +``` + +#### Step 5: Deploy Tekton + +**Install Tekton Pipelines**: +```bash +# Create namespace +microk8s kubectl create namespace tekton-pipelines + +# Install Tekton Pipelines +microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml + +# Install Tekton Triggers +microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml + +# Install Tekton Dashboard (optional) +microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/dashboard/latest/release.yaml +``` + +**Verify Installation**: +```bash +microk8s kubectl get pods -n tekton-pipelines +``` + +#### Step 6: Configure Tekton for Gitea Integration + +**Create Gitea Webhook Secret**: +```bash +# Generate webhook secret +WEBHOOK_SECRET=$(openssl rand -hex 20) + +# Create secret +microk8s kubectl create secret generic gitea-webhook-secret \ + -n tekton-pipelines \ + --from-literal=secretToken=$WEBHOOK_SECRET +``` + +**Configure Gitea Webhook**: +1. Go to Gitea repository settings +2. Add webhook: + - URL: `http://tekton-triggers.tekton-pipelines.svc.cluster.local:8080` + - Secret: Use the generated `WEBHOOK_SECRET` + - Trigger: Push events + +#### Step 7: Verify Flux Installation + +**Check Flux Components**: +```bash +microk8s kubectl get pods -n flux-system + +# Verify CRDs +microk8s kubectl get crd | grep flux +``` + +--- + +## Phase 2: CI/CD Pipeline Configuration + +### Objective +Configure pipelines to build, test, and deploy services automatically. + +### Step-by-Step Implementation + +#### Step 1: Create Tekton Tasks + +**Git Clone Task** (`infrastructure/ci-cd/tekton/tasks/git-clone.yaml`): +```yaml +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: git-clone + namespace: tekton-pipelines +spec: + workspaces: + - name: output + params: + - name: url + type: string + - name: revision + type: string + default: "main" + steps: + - name: clone + image: alpine/git + script: | + git clone $(params.url) $(workspaces.output.path) + cd $(workspaces.output.path) + git checkout $(params.revision) +``` + +**Detect Changed Services Task** (`infrastructure/ci-cd/tekton/tasks/detect-changes.yaml`): +```yaml +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: detect-changed-services + namespace: tekton-pipelines +spec: + workspaces: + - name: source + results: + - name: changed-services + description: List of changed services + steps: + - name: detect + image: alpine/git + script: | + cd $(workspaces.source.path) + # Get list of changed files + CHANGED_FILES=$(git diff --name-only HEAD~1 HEAD) + + # Map files to services + CHANGED_SERVICES=() + for file in $CHANGED_FILES; do + if [[ $file == services/* ]]; then + SERVICE=$(echo $file | cut -d'/' -f2) + CHANGED_SERVICES+=($SERVICE) + fi + done + + # Remove duplicates and output + echo $(printf "%s," "${CHANGED_SERVICES[@]}" | sed 's/,$//') | tee $(results.changed-services.path) +``` + +**Kaniko Build Task** (`infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml`): +```yaml +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: kaniko-build + namespace: tekton-pipelines +spec: + workspaces: + - name: source + - name: docker-credentials + params: + - name: services + type: string + - name: registry + type: string + default: "gitea.bakery-ia.local:5000" + steps: + - name: build-and-push + image: gcr.io/kaniko-project/executor:v1.9.0 + args: + - --dockerfile=$(workspaces.source.path)/services/$(params.services)/Dockerfile + - --context=$(workspaces.source.path) + - --destination=$(params.registry)/bakery/$(params.services):$(params.git-revision) + volumeMounts: + - name: docker-config + mountPath: /kaniko/.docker +``` + +#### Step 2: Create Tekton Pipeline + +**Main CI Pipeline** (`infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml`): +```yaml +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + name: bakery-ia-ci + namespace: tekton-pipelines +spec: + workspaces: + - name: shared-workspace + - name: docker-credentials + params: + - name: git-url + type: string + - name: git-revision + type: string + tasks: + - name: fetch-source + taskRef: + name: git-clone + workspaces: + - name: output + workspace: shared-workspace + params: + - name: url + value: $(params.git-url) + - name: revision + value: $(params.git-revision) + + - name: detect-changes + runAfter: [fetch-source] + taskRef: + name: detect-changed-services + workspaces: + - name: source + workspace: shared-workspace + + - name: build-and-push + runAfter: [detect-changes] + taskRef: + name: kaniko-build + workspaces: + - name: source + workspace: shared-workspace + - name: docker-credentials + workspace: docker-credentials + params: + - name: services + value: $(tasks.detect-changes.results.changed-services) + - name: registry + value: "gitea.bakery-ia.local:5000" +``` + +#### Step 3: Create Tekton Trigger + +**Trigger Template** (`infrastructure/ci-cd/tekton/triggers/trigger-template.yaml`): +```yaml +apiVersion: triggers.tekton.dev/v1alpha1 +kind: TriggerTemplate +metadata: + name: bakery-ia-trigger-template + namespace: tekton-pipelines +spec: + params: + - name: git-repo-url + - name: git-revision + resourcetemplates: + - apiVersion: tekton.dev/v1beta1 + kind: PipelineRun + metadata: + generateName: bakery-ia-ci-run- + spec: + pipelineRef: + name: bakery-ia-ci + workspaces: + - name: shared-workspace + volumeClaimTemplate: + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 1Gi + - name: docker-credentials + secret: + secretName: gitea-registry-credentials + params: + - name: git-url + value: $(params.git-repo-url) + - name: git-revision + value: $(params.git-revision) +``` + +**Trigger Binding** (`infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml`): +```yaml +apiVersion: triggers.tekton.dev/v1alpha1 +kind: TriggerBinding +metadata: + name: bakery-ia-trigger-binding + namespace: tekton-pipelines +spec: + params: + - name: git-repo-url + value: $(body.repository.clone_url) + - name: git-revision + value: $(body.head_commit.id) +``` + +**Event Listener** (`infrastructure/ci-cd/tekton/triggers/event-listener.yaml`): +```yaml +apiVersion: triggers.tekton.dev/v1alpha1 +kind: EventListener +metadata: + name: bakery-ia-listener + namespace: tekton-pipelines +spec: + serviceAccountName: tekton-triggers-sa + triggers: + - name: bakery-ia-trigger + bindings: + - ref: bakery-ia-trigger-binding + template: + ref: bakery-ia-trigger-template +``` + +#### Step 4: Configure Flux for GitOps + +**Git Repository Source** (`infrastructure/ci-cd/flux/git-repository.yaml`): +```yaml +apiVersion: source.toolkit.fluxcd.io/v1 +kind: GitRepository +metadata: + name: bakery-ia + namespace: flux-system +spec: + interval: 1m + url: http://gitea.bakery-ia.local/your-org/bakery-ia.git + ref: + branch: main + secretRef: + name: gitea-credentials +``` + +**Kustomization for Production** (`infrastructure/ci-cd/flux/kustomization.yaml`): +```yaml +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: bakery-ia-prod + namespace: flux-system +spec: + interval: 5m + path: ./infrastructure/kubernetes/overlays/prod + prune: true + sourceRef: + kind: GitRepository + name: bakery-ia + targetNamespace: bakery-ia +``` + +#### Step 5: Apply All Configurations + +```bash +# Apply Tekton tasks +microk8s kubectl apply -f infrastructure/ci-cd/tekton/tasks/ + +# Apply Tekton pipeline +microk8s kubectl apply -f infrastructure/ci-cd/tekton/pipelines/ + +# Apply Tekton triggers +microk8s kubectl apply -f infrastructure/ci-cd/tekton/triggers/ + +# Apply Flux configurations +microk8s kubectl apply -f infrastructure/ci-cd/flux/ +``` + +--- + +## Phase 3: Monitoring and Observability + +### Objective +Integrate SigNoz with CI/CD pipelines for comprehensive monitoring. + +### Step-by-Step Implementation + +#### Step 1: Configure OpenTelemetry for Tekton + +**Install OpenTelemetry Collector** (`infrastructure/ci-cd/monitoring/otel-collector.yaml`): +```yaml +apiVersion: opentelemetry.io/v1alpha1 +kind: OpenTelemetryCollector +metadata: + name: tekton-otel + namespace: tekton-pipelines +spec: + config: | + receivers: + otlp: + protocols: + grpc: + http: + processors: + batch: + exporters: + otlp: + endpoint: "signoz-otel-collector.monitoring.svc.cluster.local:4317" + tls: + insecure: true + service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [otlp] +``` + +**Apply Configuration**: +```bash +microk8s kubectl apply -f infrastructure/ci-cd/monitoring/otel-collector.yaml +``` + +#### Step 2: Instrument Tekton Pipelines + +**Update Pipeline with Tracing** (add to `ci-pipeline.yaml`): +```yaml +spec: + tasks: + - name: fetch-source + taskRef: + name: git-clone + # Add OpenTelemetry sidecar + sidecars: + - name: otel-collector + image: otel/opentelemetry-collector-contrib:0.70.0 + args: ["--config=/etc/otel-collector-config.yaml"] + volumeMounts: + - name: otel-config + mountPath: /etc/otel-collector-config.yaml + subPath: otel-collector-config.yaml + volumes: + - name: otel-config + configMap: + name: otel-collector-config +``` + +#### Step 3: Configure SigNoz Dashboards + +**Create CI/CD Dashboard**: +1. Log in to SigNoz UI +2. Create new dashboard: "CI/CD Pipeline Metrics" +3. Add panels: + - Pipeline execution time + - Success/failure rates + - Build duration by service + - Resource usage during builds + +**Create Deployment Dashboard**: +1. Create dashboard: "GitOps Deployment Metrics" +2. Add panels: + - Flux reconciliation events + - Deployment frequency + - Rollback events + - Resource changes + +--- + +## Phase 4: Testing and Validation + +### Objective +Validate CI/CD pipeline functionality without affecting production. + +### Test Plan + +#### Test 1: Gitea Functionality +- **Test**: Push code to Gitea repository +- **Expected**: Code appears in Gitea UI, webhook triggers +- **Validation**: + ```bash + # Push test commit + cd bakery-ia +echo "test" > test-file.txt +git add test-file.txt +git commit -m "Test CI/CD" +git push origin main + ``` + +#### Test 2: Tekton Pipeline Trigger +- **Test**: Verify pipeline triggers on push +- **Expected**: PipelineRun created in tekton-pipelines namespace +- **Validation**: + ```bash + # Check PipelineRuns + microk8s kubectl get pipelineruns -n tekton-pipelines + ``` + +#### Test 3: Change Detection +- **Test**: Modify single service and verify only that service builds +- **Expected**: Only changed service is built and pushed +- **Validation**: + ```bash + # Check build logs + microk8s kubectl logs -n tekton-pipelines -c build-and-push + ``` + +#### Test 4: Image Registry +- **Test**: Verify images pushed to Gitea registry +- **Expected**: New image appears in registry +- **Validation**: + ```bash + # List images in registry + curl -X GET http://gitea.bakery-ia.local/api/v2/repositories/bakery/auth-service/tags + ``` + +#### Test 5: Flux Deployment +- **Test**: Verify Flux detects and applies changes +- **Expected**: New deployment in bakery-ia namespace +- **Validation**: + ```bash + # Check Flux reconciliation + microk8s kubectl get kustomizations -n flux-system + + # Check deployments + microk8s kubectl get deployments -n bakery-ia + ``` + +#### Test 6: Rollback +- **Test**: Verify rollback capability +- **Expected**: Previous version redeployed successfully +- **Validation**: + ```bash + # Rollback via Git + git revert +git push origin main + + # Verify rollback + microk8s kubectl get pods -n bakery-ia -w + ``` + +--- + +## Phase 5: Rollout and Migration + +### Objective +Gradually migrate from manual to automated CI/CD. + +### Migration Strategy + +#### Step 1: Parallel Run +- Run automated CI/CD alongside manual process +- Compare results for 1 week +- Monitor with SigNoz + +#### Step 2: Canary Deployment +- Start with non-critical services: + - auth-service + - tenant-service + - training-service +- Monitor stability and performance + +#### Step 3: Full Migration +- Migrate all services to automated pipeline +- Disable manual deployment scripts +- Update documentation + +#### Step 4: Cleanup +- Remove old Tiltfile/Skaffold configurations +- Archive manual deployment scripts +- Update team documentation + +--- + +## Risk Assessment + +### Identified Risks + +| Risk | Likelihood | Impact | Mitigation Strategy | +|------|------------|--------|---------------------| +| Pipeline fails to detect changes | Medium | High | Manual override procedure, detailed logging | +| Resource exhaustion during builds | High | Medium | Resource quotas, build queue limits | +| Registry storage fills up | Medium | Medium | Automated cleanup policy, monitoring alerts | +| Flux applies incorrect configuration | Low | High | Manual approval for first run, rollback testing | +| Network issues between components | Medium | High | Health checks, retry logic | + +### Mitigation Plan + +1. **Resource Management**: + - Set resource quotas for CI/CD namespaces + - Limit concurrent builds to 2 + - Monitor with SigNoz alerts + +2. **Backup Strategy**: + - Regular backups of Gitea (repos + registry) + - Backup Flux configurations + - Database backups for all services + +3. **Rollback Plan**: + - Document manual rollback procedures + - Test rollback for each service + - Maintain backup of manual deployment scripts + +4. **Monitoring Alerts**: + - Pipeline failure alerts + - Resource threshold alerts + - Deployment failure alerts + +--- + +## Success Metrics + +### Quantitative Metrics +1. **Deployment Frequency**: Increase from manual to automated deployments +2. **Lead Time for Changes**: Reduce from hours to minutes +3. **Change Failure Rate**: Maintain or reduce current rate +4. **Mean Time to Recovery**: Improve with automated rollbacks +5. **Resource Utilization**: Monitor CI/CD overhead (< 2GB baseline) + +### Qualitative Metrics +1. **Developer Satisfaction**: Survey team on CI/CD experience +2. **Deployment Confidence**: Reduced "works on my machine" issues +3. **Auditability**: Full traceability of all deployments +4. **Reliability**: Consistent deployment outcomes + +--- + +## Appendices + +### Appendix A: Required Tools and Versions +- MicroK8s: v1.27+ +- Gitea: v1.19+ +- Tekton Pipelines: v0.47+ +- Flux CD: v2.0+ +- SigNoz: v0.20+ +- Kaniko: v1.9+ + +### Appendix B: Network Requirements +- Internal DNS: `gitea.bakery-ia.local` +- Ingress: Configured for Gitea and SigNoz +- Network Policies: Allow communication between namespaces + +### Appendix C: Backup Procedures +```bash +# Backup Gitea +microk8s kubectl exec -n gitea gitea-0 -- gitea dump -c /data/gitea/conf/app.ini + +# Backup Flux configurations +microk8s kubectl get all -n flux-system -o yaml > flux-backup.yaml + +# Backup Tekton configurations +microk8s kubectl get all -n tekton-pipelines -o yaml > tekton-backup.yaml +``` + +### Appendix D: Troubleshooting Guide + +**Issue: Pipeline not triggering** +- Check Gitea webhook logs +- Verify EventListener pods +- Check TriggerBinding configuration + +**Issue: Build fails** +- Check Kaniko logs +- Verify Dockerfile paths +- Ensure registry credentials are correct + +**Issue: Flux not applying changes** +- Check GitRepository status +- Verify Kustomization reconciliation +- Check Flux logs + +--- + +## Conclusion + +This implementation plan provides a clear path to transition from manual deployments to a fully automated, self-hosted CI/CD system. By following the phased approach, we minimize risk while maximizing the benefits of automation, observability, and reliability. + +### Next Steps +1. Review and approve this plan +2. Schedule Phase 1 implementation +3. Assign team members to specific tasks +4. Begin infrastructure setup + +**Approval**: +- [ ] Team Lead +- [ ] DevOps Engineer +- [ ] Security Review + +**Implementation Start Date**: _______________ +**Target Completion Date**: _______________ diff --git a/INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md b/INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md new file mode 100644 index 00000000..ef36620d --- /dev/null +++ b/INFRASTRUCTURE_REORGANIZATION_PROPOSAL.md @@ -0,0 +1,413 @@ +# Infrastructure Reorganization Proposal for Bakery-IA + +## Executive Summary + +This document presents a comprehensive analysis of the current infrastructure organization and proposes a restructured layout that improves maintainability, scalability, and operational efficiency. The proposal is based on a detailed examination of the existing 177 files across 31 directories in the infrastructure folder. + +## Current Infrastructure Analysis + +### Current Structure Overview + +``` +infrastructure/ +├── ci-cd/ # 18 files - CI/CD pipeline components +├── helm/ # 8 files - Helm charts and scripts +├── kubernetes/ # 103 files - Kubernetes manifests and configs +├── signoz/ # 11 files - Monitoring dashboards and scripts +└── tls/ # 37 files - TLS certificates and generation scripts +``` + +### Key Findings + +1. **Kubernetes Base Components (103 files)**: The most complex area with: + - 20+ service deployments across 15+ microservices + - 20+ database configurations (PostgreSQL, RabbitMQ, MinIO) + - 19 migration jobs for different services + - Infrastructure components (gateway, monitoring, etc.) + +2. **CI/CD Pipeline (18 files)**: + - Tekton tasks and pipelines for GitOps workflow + - Flux CD configuration for continuous delivery + - Gitea configuration for Git repository management + +3. **Monitoring (11 files)**: + - SigNoz dashboards for comprehensive observability + - Import scripts for dashboard management + +4. **TLS Certificates (37 files)**: + - CA certificates and generation scripts + - Service-specific certificates (PostgreSQL, Redis, MinIO) + - Certificate signing requests and configurations + +### Strengths of Current Organization + +1. **Logical Grouping**: Components are generally well-grouped by function +2. **Base/Overlay Pattern**: Kubernetes uses proper base/overlay structure +3. **Comprehensive Monitoring**: SigNoz dashboards cover all major aspects +4. **Security Focus**: Dedicated TLS certificate management + +### Challenges Identified + +1. **Complexity in Kubernetes Base**: 103 files make navigation difficult +2. **Mixed Component Types**: Services, databases, and infrastructure mixed together +3. **Limited Environment Separation**: Only dev/prod overlays, no staging +4. **Script Scattering**: Automation scripts spread across directories +5. **Documentation Gaps**: Some components lack clear documentation + +## Proposed Infrastructure Organization + +### High-Level Structure + +``` +infrastructure/ +├── environments/ # Environment-specific configurations +├── platform/ # Platform-level infrastructure +├── services/ # Application services and microservices +├── monitoring/ # Observability and monitoring +├── cicd/ # CI/CD pipeline components +├── security/ # Security configurations and certificates +├── scripts/ # Automation and utility scripts +├── docs/ # Infrastructure documentation +└── README.md # Top-level infrastructure guide +``` + +### Detailed Structure Proposal + +``` +infrastructure/ +├── environments/ # Environment-specific configurations +│ ├── dev/ +│ │ ├── k8s-manifests/ +│ │ │ ├── base/ +│ │ │ │ ├── namespace.yaml +│ │ │ │ ├── configmap.yaml +│ │ │ │ ├── secrets.yaml +│ │ │ │ └── ingress-https.yaml +│ │ │ ├── components/ +│ │ │ │ ├── databases/ +│ │ │ │ ├── infrastructure/ +│ │ │ │ ├── microservices/ +│ │ │ │ └── cert-manager/ +│ │ │ ├── configs/ +│ │ │ ├── cronjobs/ +│ │ │ ├── jobs/ +│ │ │ └── migrations/ +│ │ ├── kustomization.yaml +│ │ └── values/ +│ ├── staging/ # New staging environment +│ │ ├── k8s-manifests/ +│ │ └── values/ +│ └── prod/ +│ ├── k8s-manifests/ +│ ├── terraform/ # Production-specific IaC +│ └── values/ +├── platform/ # Platform-level infrastructure +│ ├── cluster/ +│ │ ├── eks/ # AWS EKS configuration +│ │ │ ├── terraform/ +│ │ │ └── manifests/ +│ │ └── kind/ # Local development cluster +│ │ ├── config.yaml +│ │ └── manifests/ +│ ├── networking/ +│ │ ├── dns/ +│ │ ├── load-balancers/ +│ │ └── ingress/ +│ │ ├── nginx/ +│ │ └── cert-manager/ +│ ├── security/ +│ │ ├── rbac/ +│ │ ├── network-policies/ +│ │ └── tls/ +│ │ ├── ca/ +│ │ ├── postgres/ +│ │ ├── redis/ +│ │ └── minio/ +│ └── storage/ +│ ├── postgres/ +│ ├── redis/ +│ └── minio/ +├── services/ # Application services +│ ├── databases/ +│ │ ├── postgres/ +│ │ │ ├── k8s-manifests/ +│ │ │ ├── backups/ +│ │ │ ├── monitoring/ +│ │ │ └── maintenance/ +│ │ ├── redis/ +│ │ │ ├── configs/ +│ │ │ └── monitoring/ +│ │ └── minio/ +│ │ ├── buckets/ +│ │ └── policies/ +│ ├── api-gateway/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ └── microservices/ +│ ├── auth/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── tenant/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── training/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── forecasting/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── sales/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── external/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── notification/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── inventory/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── recipes/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── suppliers/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── pos/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── orders/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── production/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── procurement/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── orchestrator/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── alert-processor/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── ai-insights/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ ├── demo-session/ +│ │ ├── k8s-manifests/ +│ │ └── configs/ +│ └── frontend/ +│ ├── k8s-manifests/ +│ └── configs/ +├── monitoring/ # Observability stack +│ ├── signoz/ +│ │ ├── manifests/ +│ │ ├── dashboards/ +│ │ │ ├── alert-management.json +│ │ │ ├── api-performance.json +│ │ │ ├── application-performance.json +│ │ │ ├── database-performance.json +│ │ │ ├── error-tracking.json +│ │ │ ├── index.json +│ │ │ ├── infrastructure-monitoring.json +│ │ │ ├── log-analysis.json +│ │ │ ├── system-health.json +│ │ │ └── user-activity.json +│ │ ├── values-dev.yaml +│ │ ├── values-prod.yaml +│ │ ├── deploy-signoz.sh +│ │ ├── verify-signoz.sh +│ │ └── generate-test-traffic.sh +│ └── opentelemetry/ +│ ├── collector/ +│ └── agent/ +├── cicd/ # CI/CD pipeline +│ ├── gitea/ +│ │ ├── values.yaml +│ │ └── ingress.yaml +│ ├── tekton/ +│ │ ├── tasks/ +│ │ │ ├── git-clone.yaml +│ │ │ ├── detect-changes.yaml +│ │ │ ├── kaniko-build.yaml +│ │ │ └── update-gitops.yaml +│ │ ├── pipelines/ +│ │ └── triggers/ +│ └── flux/ +│ ├── git-repository.yaml +│ └── kustomization.yaml +├── security/ # Security configurations +│ ├── policies/ +│ │ ├── network-policies.yaml +│ │ ├── pod-security.yaml +│ │ └── rbac.yaml +│ ├── certificates/ +│ │ ├── ca/ +│ │ ├── services/ +│ │ └── rotation-scripts/ +│ ├── scanning/ +│ │ ├── trivy/ +│ │ └── policies/ +│ └── compliance/ +│ ├── cis-benchmarks/ +│ └── audit-scripts/ +├── scripts/ # Automation scripts +│ ├── setup/ +│ │ ├── generate-certificates.sh +│ │ ├── generate-minio-certificates.sh +│ │ └── setup-dockerhub-secrets.sh +│ ├── deployment/ +│ │ ├── deploy-signoz.sh +│ │ └── verify-signoz.sh +│ ├── maintenance/ +│ │ ├── regenerate_migrations_k8s.sh +│ │ └── kubernetes_restart.sh +│ └── verification/ +│ └── verify-registry.sh +├── docs/ # Infrastructure documentation +│ ├── architecture/ +│ │ ├── diagrams/ +│ │ └── decisions/ +│ ├── operations/ +│ │ ├── runbooks/ +│ │ └── troubleshooting/ +│ ├── onboarding/ +│ └── reference/ +│ ├── api/ +│ └── configurations/ +└── README.md +``` + +## Migration Strategy + +### Phase 1: Preparation and Planning + +1. **Inventory Analysis**: Complete detailed inventory of all current files +2. **Dependency Mapping**: Identify dependencies between components +3. **Impact Assessment**: Determine which components can be moved safely +4. **Backup Strategy**: Ensure all files are backed up before migration + +### Phase 2: Non-Critical Components + +1. **Documentation**: Move and update all documentation files +2. **Scripts**: Organize automation scripts into new structure +3. **Monitoring**: Migrate SigNoz dashboards and configurations +4. **CI/CD**: Reorganize pipeline components + +### Phase 3: Environment-Specific Components + +1. **Create Environment Structure**: Set up dev/staging/prod directories +2. **Migrate Kubernetes Manifests**: Move base components to appropriate locations +3. **Update References**: Ensure all cross-references are corrected +4. **Environment Validation**: Test each environment separately + +### Phase 4: Service Components + +1. **Database Migration**: Move database configurations to services/databases +2. **Microservice Organization**: Group microservices by domain +3. **Infrastructure Components**: Move gateway and other infrastructure +4. **Service Validation**: Test each service in isolation + +### Phase 5: Finalization + +1. **Integration Testing**: Test complete infrastructure workflow +2. **Documentation Update**: Finalize all documentation +3. **Team Training**: Conduct training on new structure +4. **Cleanup**: Remove old structure and temporary files + +## Benefits of Proposed Structure + +### 1. Improved Navigation +- **Clear Hierarchy**: Logical grouping by function and environment +- **Consistent Patterns**: Standardized structure across all components +- **Reduced Cognitive Load**: Easier to find specific components + +### 2. Enhanced Maintainability +- **Environment Isolation**: Clear separation of dev/staging/prod +- **Component Grouping**: Related components grouped together +- **Standardized Structure**: Consistent patterns across services + +### 3. Better Scalability +- **Modular Design**: Easy to add new services or environments +- **Domain Separation**: Services organized by business domain +- **Infrastructure Independence**: Platform components separate from services + +### 4. Improved Security +- **Centralized Security**: All security configurations in one place +- **Environment-Specific Policies**: Tailored security for each environment +- **Better Secret Management**: Clear structure for sensitive data + +### 5. Enhanced Observability +- **Comprehensive Monitoring**: All observability tools grouped +- **Standardized Dashboards**: Consistent monitoring across services +- **Centralized Logging**: Better log management structure + +## Implementation Considerations + +### Tools and Technologies +- **Terraform**: For infrastructure as code (IaC) +- **Kustomize**: For Kubernetes manifest management +- **Helm**: For complex application deployments +- **SOPS/Sealed Secrets**: For secret management +- **Trivy**: For vulnerability scanning + +### Team Adaptation +- **Training Plan**: Develop comprehensive training materials +- **Migration Guide**: Create step-by-step migration documentation +- **Support Period**: Provide dedicated support during transition +- **Feedback Mechanism**: Establish channels for team feedback + +### Risk Mitigation +- **Phased Approach**: Implement changes incrementally +- **Rollback Plan**: Develop comprehensive rollback procedures +- **Testing Strategy**: Implement thorough testing at each phase +- **Monitoring**: Enhanced monitoring during migration period + +## Expected Outcomes + +1. **Reduced Time-to-Find**: 40-60% reduction in time spent locating files +2. **Improved Deployment Speed**: 25-35% faster deployment cycles +3. **Enhanced Collaboration**: Better team coordination and understanding +4. **Reduced Errors**: 30-50% reduction in configuration errors +5. **Better Scalability**: Easier to add new services and features + +## Conclusion + +The proposed infrastructure reorganization represents a significant improvement over the current structure. By implementing a clear, logical hierarchy with proper separation of concerns, the new organization will: + +- **Improve operational efficiency** through better navigation and maintainability +- **Enhance security** with centralized security management +- **Support growth** with a scalable, modular design +- **Reduce errors** through standardized patterns and structures +- **Facilitate collaboration** with intuitive organization + +The key to successful implementation is a phased approach with thorough testing and team involvement at each stage. With proper planning and execution, this reorganization will provide long-term benefits for the Bakery-IA project's infrastructure management. + +## Appendix: File Migration Mapping + +### Current → Proposed Mapping + +**Kubernetes Components:** +- `infrastructure/kubernetes/base/components/*` → `infrastructure/services/microservices/*/` +- `infrastructure/kubernetes/base/components/databases/*` → `infrastructure/services/databases/*/` +- `infrastructure/kubernetes/base/migrations/*` → `infrastructure/services/microservices/*/migrations/` +- `infrastructure/kubernetes/base/configs/*` → `infrastructure/environments/*/values/` + +**CI/CD Components:** +- `infrastructure/ci-cd/*` → `infrastructure/cicd/*/` + +**Monitoring Components:** +- `infrastructure/signoz/*` → `infrastructure/monitoring/signoz/*/` +- `infrastructure/helm/*` → `infrastructure/monitoring/signoz/*/` (signoz-related) + +**Security Components:** +- `infrastructure/tls/*` → `infrastructure/security/certificates/*/` + +**Scripts:** +- `infrastructure/kubernetes/*.sh` → `infrastructure/scripts/*/` +- `infrastructure/helm/*.sh` → `infrastructure/scripts/deployment/*/` +- `infrastructure/tls/*.sh` → `infrastructure/scripts/setup/*/` + +This mapping provides a clear path for migrating each component to its new location while maintaining functionality and relationships between components. \ No newline at end of file diff --git a/infrastructure/ci-cd/README.md b/infrastructure/ci-cd/README.md new file mode 100644 index 00000000..ebdb18dd --- /dev/null +++ b/infrastructure/ci-cd/README.md @@ -0,0 +1,294 @@ +# Bakery-IA CI/CD Implementation + +This directory contains the configuration for the production-grade CI/CD system for Bakery-IA using Gitea, Tekton, and Flux CD. + +## Architecture Overview + +```mermaid +graph TD + A[Developer] -->|Push Code| B[Gitea] + B -->|Webhook| C[Tekton Pipelines] + C -->|Build/Test| D[Gitea Registry] + D -->|New Image| E[Flux CD] + E -->|kubectl apply| F[MicroK8s Cluster] + F -->|Metrics| G[SigNoz] +``` + +## Directory Structure + +``` +infrastructure/ci-cd/ +├── gitea/ # Gitea configuration (Git server + registry) +│ ├── values.yaml # Helm values for Gitea +│ └── ingress.yaml # Ingress configuration +├── tekton/ # Tekton CI/CD pipeline configuration +│ ├── tasks/ # Individual pipeline tasks +│ │ ├── git-clone.yaml +│ │ ├── detect-changes.yaml +│ │ ├── kaniko-build.yaml +│ │ └── update-gitops.yaml +│ ├── pipelines/ # Pipeline definitions +│ │ └── ci-pipeline.yaml +│ └── triggers/ # Webhook trigger configuration +│ ├── trigger-template.yaml +│ ├── trigger-binding.yaml +│ ├── event-listener.yaml +│ └── gitlab-interceptor.yaml +├── flux/ # Flux CD GitOps configuration +│ ├── git-repository.yaml # Git repository source +│ └── kustomization.yaml # Deployment kustomization +├── monitoring/ # Monitoring configuration +│ └── otel-collector.yaml # OpenTelemetry collector +└── README.md # This file +``` + +## Deployment Instructions + +### Phase 1: Infrastructure Setup + +1. **Deploy Gitea**: + ```bash + # Add Helm repo + microk8s helm repo add gitea https://dl.gitea.io/charts + + # Create namespace + microk8s kubectl create namespace gitea + + # Install Gitea + microk8s helm install gitea gitea/gitea \ + -n gitea \ + -f infrastructure/ci-cd/gitea/values.yaml + + # Apply ingress + microk8s kubectl apply -f infrastructure/ci-cd/gitea/ingress.yaml + ``` + +2. **Deploy Tekton**: + ```bash + # Create namespace + microk8s kubectl create namespace tekton-pipelines + + # Install Tekton Pipelines + microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml + + # Install Tekton Triggers + microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/triggers/latest/release.yaml + + # Apply Tekton configurations + microk8s kubectl apply -f infrastructure/ci-cd/tekton/tasks/ + microk8s kubectl apply -f infrastructure/ci-cd/tekton/pipelines/ + microk8s kubectl apply -f infrastructure/ci-cd/tekton/triggers/ + ``` + +3. **Deploy Flux CD** (already enabled in MicroK8s): + ```bash + # Verify Flux installation + microk8s kubectl get pods -n flux-system + + # Apply Flux configurations + microk8s kubectl apply -f infrastructure/ci-cd/flux/ + ``` + +### Phase 2: Configuration + +1. **Set up Gitea webhook**: + - Go to your Gitea repository settings + - Add webhook with URL: `http://tekton-triggers.tekton-pipelines.svc.cluster.local:8080` + - Use the secret from `gitea-webhook-secret` + +2. **Configure registry credentials**: + ```bash + # Create registry credentials secret + microk8s kubectl create secret docker-registry gitea-registry-credentials \ + -n tekton-pipelines \ + --docker-server=gitea.bakery-ia.local:5000 \ + --docker-username=your-username \ + --docker-password=your-password + ``` + +3. **Configure Git credentials for Flux**: + ```bash + # Create Git credentials secret + microk8s kubectl create secret generic gitea-credentials \ + -n flux-system \ + --from-literal=username=your-username \ + --from-literal=password=your-password + ``` + +### Phase 3: Monitoring + +```bash +# Apply OpenTelemetry configuration +microk8s kubectl apply -f infrastructure/ci-cd/monitoring/otel-collector.yaml +``` + +## Usage + +### Triggering a Pipeline + +1. **Manual trigger**: + ```bash + # Create a PipelineRun manually + microk8s kubectl create -f - < -c + +# View Tekton dashboard +microk8s kubectl port-forward -n tekton-pipelines svc/tekton-dashboard 9097:9097 +``` + +## Troubleshooting + +### Common Issues + +1. **Pipeline not triggering**: + - Check Gitea webhook logs + - Verify EventListener pods are running + - Check TriggerBinding configuration + +2. **Build failures**: + - Check Kaniko logs for build errors + - Verify Dockerfile paths are correct + - Ensure registry credentials are valid + +3. **Flux not applying changes**: + - Check GitRepository status + - Verify Kustomization reconciliation + - Check Flux logs for errors + +### Debugging Commands + +```bash +# Check Tekton controller logs +microk8s kubectl logs -n tekton-pipelines -l app=tekton-pipelines-controller + +# Check Flux reconciliation +microk8s kubectl get kustomizations -n flux-system -o yaml + +# Check Gitea webhook delivery +microk8s kubectl logs -n tekton-pipelines -l app=tekton-triggers-controller +``` + +## Security Considerations + +1. **Secrets Management**: + - Use Kubernetes secrets for sensitive data + - Rotate credentials regularly + - Use RBAC for namespace isolation + +2. **Network Security**: + - Configure network policies + - Use internal DNS names + - Restrict ingress access + +3. **Registry Security**: + - Enable image scanning + - Use image signing + - Implement cleanup policies + +## Maintenance + +### Upgrading Components + +```bash +# Upgrade Tekton +microk8s kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml + +# Upgrade Flux +microk8s helm upgrade fluxcd fluxcd/flux2 -n flux-system + +# Upgrade Gitea +microk8s helm upgrade gitea gitea/gitea -n gitea -f infrastructure/ci-cd/gitea/values.yaml +``` + +### Backup Procedures + +```bash +# Backup Gitea +microk8s kubectl exec -n gitea gitea-0 -- gitea dump -c /data/gitea/conf/app.ini + +# Backup Flux configurations +microk8s kubectl get all -n flux-system -o yaml > flux-backup.yaml + +# Backup Tekton configurations +microk8s kubectl get all -n tekton-pipelines -o yaml > tekton-backup.yaml +``` + +## Performance Optimization + +1. **Resource Management**: + - Set appropriate resource limits + - Limit concurrent builds + - Use node selectors for build pods + +2. **Caching**: + - Configure Kaniko cache + - Use persistent volumes for dependencies + - Cache Docker layers + +3. **Parallelization**: + - Build independent services in parallel + - Use matrix builds for different architectures + - Optimize task dependencies + +## Integration with Existing System + +The CI/CD system integrates with: +- **SigNoz**: For monitoring and observability +- **MicroK8s**: For cluster management +- **Existing Kubernetes manifests**: In `infrastructure/kubernetes/` +- **Current services**: All 19 microservices in `services/` + +## Migration Plan + +1. **Phase 1**: Set up infrastructure (Gitea, Tekton, Flux) +2. **Phase 2**: Configure pipelines and triggers +3. **Phase 3**: Test with non-critical services +4. **Phase 4**: Gradual rollout to all services +5. **Phase 5**: Decommission old deployment methods + +## Support + +For issues with the CI/CD system: +- Check logs and monitoring first +- Review the troubleshooting section +- Consult the original implementation plan +- Refer to component documentation: + - [Tekton Documentation](https://tekton.dev/docs/) + - [Flux CD Documentation](https://fluxcd.io/docs/) + - [Gitea Documentation](https://docs.gitea.io/) \ No newline at end of file diff --git a/infrastructure/ci-cd/flux/git-repository.yaml b/infrastructure/ci-cd/flux/git-repository.yaml new file mode 100644 index 00000000..68eb46af --- /dev/null +++ b/infrastructure/ci-cd/flux/git-repository.yaml @@ -0,0 +1,16 @@ +# Flux GitRepository for Bakery-IA +# This resource tells Flux where to find the Git repository + +apiVersion: source.toolkit.fluxcd.io/v1 +kind: GitRepository +metadata: + name: bakery-ia + namespace: flux-system +spec: + interval: 1m + url: http://gitea.bakery-ia.local/bakery/bakery-ia.git + ref: + branch: main + secretRef: + name: gitea-credentials + timeout: 60s \ No newline at end of file diff --git a/infrastructure/ci-cd/flux/kustomization.yaml b/infrastructure/ci-cd/flux/kustomization.yaml new file mode 100644 index 00000000..37e9df54 --- /dev/null +++ b/infrastructure/ci-cd/flux/kustomization.yaml @@ -0,0 +1,27 @@ +# Flux Kustomization for Bakery-IA Production Deployment +# This resource tells Flux how to deploy the application + +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: bakery-ia-prod + namespace: flux-system +spec: + interval: 5m + path: ./infrastructure/kubernetes/overlays/prod + prune: true + sourceRef: + kind: GitRepository + name: bakery-ia + targetNamespace: bakery-ia + timeout: 5m + retryInterval: 1m + healthChecks: + - apiVersion: apps/v1 + kind: Deployment + name: auth-service + namespace: bakery-ia + - apiVersion: apps/v1 + kind: Deployment + name: gateway + namespace: bakery-ia \ No newline at end of file diff --git a/infrastructure/ci-cd/gitea/ingress.yaml b/infrastructure/ci-cd/gitea/ingress.yaml new file mode 100644 index 00000000..ecfbc9d5 --- /dev/null +++ b/infrastructure/ci-cd/gitea/ingress.yaml @@ -0,0 +1,25 @@ +# Gitea Ingress configuration for Bakery-IA CI/CD +# This provides external access to Gitea within the cluster + +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: gitea-ingress + namespace: gitea + annotations: + nginx.ingress.kubernetes.io/rewrite-target: / + nginx.ingress.kubernetes.io/proxy-body-size: "0" + nginx.ingress.kubernetes.io/proxy-read-timeout: "600" + nginx.ingress.kubernetes.io/proxy-send-timeout: "600" +spec: + rules: + - host: gitea.bakery-ia.local + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: gitea-http + port: + number: 3000 \ No newline at end of file diff --git a/infrastructure/ci-cd/gitea/values.yaml b/infrastructure/ci-cd/gitea/values.yaml new file mode 100644 index 00000000..f94929e8 --- /dev/null +++ b/infrastructure/ci-cd/gitea/values.yaml @@ -0,0 +1,38 @@ +# Gitea Helm values configuration for Bakery-IA CI/CD +# This configuration sets up Gitea with registry support and appropriate storage + +service: + type: ClusterIP + httpPort: 3000 + sshPort: 2222 + +persistence: + enabled: true + size: 50Gi + storageClass: "microk8s-hostpath" + +gitea: + config: + server: + DOMAIN: gitea.bakery-ia.local + SSH_DOMAIN: gitea.bakery-ia.local + ROOT_URL: http://gitea.bakery-ia.local + repository: + ENABLE_PUSH_CREATE_USER: true + ENABLE_PUSH_CREATE_ORG: true + registry: + ENABLED: true + +postgresql: + enabled: true + persistence: + size: 20Gi + +# Resource configuration for production environment +resources: + limits: + cpu: 1000m + memory: 1Gi + requests: + cpu: 500m + memory: 512Mi \ No newline at end of file diff --git a/infrastructure/ci-cd/monitoring/otel-collector.yaml b/infrastructure/ci-cd/monitoring/otel-collector.yaml new file mode 100644 index 00000000..a8634707 --- /dev/null +++ b/infrastructure/ci-cd/monitoring/otel-collector.yaml @@ -0,0 +1,70 @@ +# OpenTelemetry Collector for Bakery-IA CI/CD Monitoring +# This collects metrics and traces from Tekton pipelines + +apiVersion: opentelemetry.io/v1alpha1 +kind: OpenTelemetryCollector +metadata: + name: tekton-otel + namespace: tekton-pipelines +spec: + config: | + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + prometheus: + config: + scrape_configs: + - job_name: 'tekton-pipelines' + scrape_interval: 30s + static_configs: + - targets: ['tekton-pipelines-controller.tekton-pipelines.svc.cluster.local:9090'] + + processors: + batch: + timeout: 5s + send_batch_size: 1000 + memory_limiter: + check_interval: 2s + limit_percentage: 75 + spike_limit_percentage: 20 + + exporters: + otlp: + endpoint: "signoz-otel-collector.monitoring.svc.cluster.local:4317" + tls: + insecure: true + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + logging: + logLevel: debug + + service: + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [otlp, logging] + metrics: + receivers: [otlp, prometheus] + processors: [memory_limiter, batch] + exporters: [otlp, logging] + telemetry: + logs: + level: "info" + encoding: "json" + + mode: deployment + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 200m + memory: 256Mi \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml b/infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml new file mode 100644 index 00000000..c20068b2 --- /dev/null +++ b/infrastructure/ci-cd/tekton/pipelines/ci-pipeline.yaml @@ -0,0 +1,83 @@ +# Main CI Pipeline for Bakery-IA +# This pipeline orchestrates the build, test, and deploy process + +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + name: bakery-ia-ci + namespace: tekton-pipelines +spec: + workspaces: + - name: shared-workspace + - name: docker-credentials + params: + - name: git-url + type: string + description: Repository URL + - name: git-revision + type: string + description: Git revision/commit hash + - name: registry + type: string + description: Container registry URL + default: "gitea.bakery-ia.local:5000" + tasks: + - name: fetch-source + taskRef: + name: git-clone + workspaces: + - name: output + workspace: shared-workspace + params: + - name: url + value: $(params.git-url) + - name: revision + value: $(params.git-revision) + + - name: detect-changes + runAfter: [fetch-source] + taskRef: + name: detect-changed-services + workspaces: + - name: source + workspace: shared-workspace + + - name: build-and-push + runAfter: [detect-changes] + taskRef: + name: kaniko-build + when: + - input: "$(tasks.detect-changes.results.changed-services)" + operator: notin + values: ["none"] + workspaces: + - name: source + workspace: shared-workspace + - name: docker-credentials + workspace: docker-credentials + params: + - name: services + value: $(tasks.detect-changes.results.changed-services) + - name: registry + value: $(params.registry) + - name: git-revision + value: $(params.git-revision) + + - name: update-gitops-manifests + runAfter: [build-and-push] + taskRef: + name: update-gitops + when: + - input: "$(tasks.detect-changes.results.changed-services)" + operator: notin + values: ["none"] + workspaces: + - name: source + workspace: shared-workspace + params: + - name: services + value: $(tasks.detect-changes.results.changed-services) + - name: registry + value: $(params.registry) + - name: git-revision + value: $(params.git-revision) \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/tasks/detect-changes.yaml b/infrastructure/ci-cd/tekton/tasks/detect-changes.yaml new file mode 100644 index 00000000..32abd32c --- /dev/null +++ b/infrastructure/ci-cd/tekton/tasks/detect-changes.yaml @@ -0,0 +1,64 @@ +# Tekton Detect Changed Services Task for Bakery-IA CI/CD +# This task identifies which services have changed in the repository + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: detect-changed-services + namespace: tekton-pipelines +spec: + workspaces: + - name: source + results: + - name: changed-services + description: Comma-separated list of changed services + steps: + - name: detect + image: alpine/git + script: | + #!/bin/sh + set -e + cd $(workspaces.source.path) + + echo "Detecting changed files..." + # Get list of changed files compared to previous commit + CHANGED_FILES=$(git diff --name-only HEAD~1 HEAD 2>/dev/null || git diff --name-only HEAD) + + echo "Changed files: $CHANGED_FILES" + + # Map files to services + CHANGED_SERVICES=() + for file in $CHANGED_FILES; do + if [[ $file == services/* ]]; then + SERVICE=$(echo $file | cut -d'/' -f2) + # Only add unique service names + if [[ ! " ${CHANGED_SERVICES[@]} " =~ " ${SERVICE} " ]]; then + CHANGED_SERVICES+=("$SERVICE") + fi + elif [[ $file == frontend/* ]]; then + CHANGED_SERVICES+=("frontend") + break + elif [[ $file == gateway/* ]]; then + CHANGED_SERVICES+=("gateway") + break + fi + done + + # If no specific services changed, check for infrastructure changes + if [ ${#CHANGED_SERVICES[@]} -eq 0 ]; then + for file in $CHANGED_FILES; do + if [[ $file == infrastructure/* ]]; then + CHANGED_SERVICES+=("infrastructure") + break + fi + done + fi + + # Output result + if [ ${#CHANGED_SERVICES[@]} -eq 0 ]; then + echo "No service changes detected" + echo "none" | tee $(results.changed-services.path) + else + echo "Detected changes in services: ${CHANGED_SERVICES[@]}" + echo $(printf "%s," "${CHANGED_SERVICES[@]}" | sed 's/,$//') | tee $(results.changed-services.path) + fi \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/tasks/git-clone.yaml b/infrastructure/ci-cd/tekton/tasks/git-clone.yaml new file mode 100644 index 00000000..5decee5c --- /dev/null +++ b/infrastructure/ci-cd/tekton/tasks/git-clone.yaml @@ -0,0 +1,31 @@ +# Tekton Git Clone Task for Bakery-IA CI/CD +# This task clones the source code repository + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: git-clone + namespace: tekton-pipelines +spec: + workspaces: + - name: output + params: + - name: url + type: string + description: Repository URL to clone + - name: revision + type: string + description: Git revision to checkout + default: "main" + steps: + - name: clone + image: alpine/git + script: | + #!/bin/sh + set -e + echo "Cloning repository: $(params.url)" + git clone $(params.url) $(workspaces.output.path) + cd $(workspaces.output.path) + echo "Checking out revision: $(params.revision)" + git checkout $(params.revision) + echo "Repository cloned successfully" \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml b/infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml new file mode 100644 index 00000000..1f9290e8 --- /dev/null +++ b/infrastructure/ci-cd/tekton/tasks/kaniko-build.yaml @@ -0,0 +1,40 @@ +# Tekton Kaniko Build Task for Bakery-IA CI/CD +# This task builds and pushes container images using Kaniko + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: kaniko-build + namespace: tekton-pipelines +spec: + workspaces: + - name: source + - name: docker-credentials + params: + - name: services + type: string + description: Comma-separated list of services to build + - name: registry + type: string + description: Container registry URL + default: "gitea.bakery-ia.local:5000" + - name: git-revision + type: string + description: Git revision for image tag + default: "latest" + steps: + - name: build-and-push + image: gcr.io/kaniko-project/executor:v1.9.0 + args: + - --dockerfile=$(workspaces.source.path)/services/$(params.services)/Dockerfile + - --context=$(workspaces.source.path) + - --destination=$(params.registry)/bakery/$(params.services):$(params.git-revision) + - --verbosity=info + volumeMounts: + - name: docker-config + mountPath: /kaniko/.docker + securityContext: + runAsUser: 0 + volumes: + - name: docker-config + emptyDir: {} \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/tasks/update-gitops.yaml b/infrastructure/ci-cd/tekton/tasks/update-gitops.yaml new file mode 100644 index 00000000..1e89d170 --- /dev/null +++ b/infrastructure/ci-cd/tekton/tasks/update-gitops.yaml @@ -0,0 +1,66 @@ +# Tekton Update GitOps Manifests Task for Bakery-IA CI/CD +# This task updates Kubernetes manifests with new image tags + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: update-gitops + namespace: tekton-pipelines +spec: + workspaces: + - name: source + params: + - name: services + type: string + description: Comma-separated list of services to update + - name: registry + type: string + description: Container registry URL + - name: git-revision + type: string + description: Git revision for image tag + steps: + - name: update-manifests + image: bitnami/kubectl + script: | + #!/bin/sh + set -e + cd $(workspaces.source.path) + + echo "Updating GitOps manifests for services: $(params.services)" + + # Split services by comma + IFS=',' read -ra SERVICES <<< "$(params.services)" + + for service in "${SERVICES[@]}"; do + echo "Processing service: $service" + + # Find and update Kubernetes manifests + if [ "$service" = "frontend" ]; then + # Update frontend deployment + if [ -f "infrastructure/kubernetes/overlays/prod/frontend-deployment.yaml" ]; then + sed -i "s|image:.*|image: $(params.registry)/bakery/frontend:$(params.git-revision)|g" \ + "infrastructure/kubernetes/overlays/prod/frontend-deployment.yaml" + fi + elif [ "$service" = "gateway" ]; then + # Update gateway deployment + if [ -f "infrastructure/kubernetes/overlays/prod/gateway-deployment.yaml" ]; then + sed -i "s|image:.*|image: $(params.registry)/bakery/gateway:$(params.git-revision)|g" \ + "infrastructure/kubernetes/overlays/prod/gateway-deployment.yaml" + fi + else + # Update service deployment + DEPLOYMENT_FILE="infrastructure/kubernetes/overlays/prod/${service}-deployment.yaml" + if [ -f "$DEPLOYMENT_FILE" ]; then + sed -i "s|image:.*|image: $(params.registry)/bakery/${service}:$(params.git-revision)|g" \ + "$DEPLOYMENT_FILE" + fi + fi + done + + # Commit changes + git config --global user.name "bakery-ia-ci" + git config --global user.email "ci@bakery-ia.local" + git add . + git commit -m "CI: Update image tags for $(params.services) to $(params.git-revision)" + git push origin HEAD \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/triggers/event-listener.yaml b/infrastructure/ci-cd/tekton/triggers/event-listener.yaml new file mode 100644 index 00000000..5049bacb --- /dev/null +++ b/infrastructure/ci-cd/tekton/triggers/event-listener.yaml @@ -0,0 +1,26 @@ +# Tekton EventListener for Bakery-IA CI/CD +# This listener receives webhook events and triggers pipelines + +apiVersion: triggers.tekton.dev/v1alpha1 +kind: EventListener +metadata: + name: bakery-ia-listener + namespace: tekton-pipelines +spec: + serviceAccountName: tekton-triggers-sa + triggers: + - name: bakery-ia-gitea-trigger + bindings: + - ref: bakery-ia-trigger-binding + template: + ref: bakery-ia-trigger-template + interceptors: + - ref: + name: "gitlab" + params: + - name: "secretRef" + value: + secretName: gitea-webhook-secret + secretKey: secretToken + - name: "eventTypes" + value: ["push"] \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml b/infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml new file mode 100644 index 00000000..c8fc1c26 --- /dev/null +++ b/infrastructure/ci-cd/tekton/triggers/gitlab-interceptor.yaml @@ -0,0 +1,14 @@ +# GitLab/Gitea Webhook Interceptor for Tekton Triggers +# This interceptor validates and processes Gitea webhook events + +apiVersion: triggers.tekton.dev/v1alpha1 +kind: ClusterInterceptor +metadata: + name: gitlab +spec: + clientConfig: + service: + name: tekton-triggers-core-interceptors + namespace: tekton-pipelines + path: "/v1/webhook/gitlab" + port: 8443 \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml b/infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml new file mode 100644 index 00000000..8116792a --- /dev/null +++ b/infrastructure/ci-cd/tekton/triggers/trigger-binding.yaml @@ -0,0 +1,16 @@ +# Tekton TriggerBinding for Bakery-IA CI/CD +# This binding extracts parameters from Gitea webhook events + +apiVersion: triggers.tekton.dev/v1alpha1 +kind: TriggerBinding +metadata: + name: bakery-ia-trigger-binding + namespace: tekton-pipelines +spec: + params: + - name: git-repo-url + value: $(body.repository.clone_url) + - name: git-revision + value: $(body.head_commit.id) + - name: git-repo-name + value: $(body.repository.name) \ No newline at end of file diff --git a/infrastructure/ci-cd/tekton/triggers/trigger-template.yaml b/infrastructure/ci-cd/tekton/triggers/trigger-template.yaml new file mode 100644 index 00000000..17208bf8 --- /dev/null +++ b/infrastructure/ci-cd/tekton/triggers/trigger-template.yaml @@ -0,0 +1,43 @@ +# Tekton TriggerTemplate for Bakery-IA CI/CD +# This template defines how PipelineRuns are created when triggers fire + +apiVersion: triggers.tekton.dev/v1alpha1 +kind: TriggerTemplate +metadata: + name: bakery-ia-trigger-template + namespace: tekton-pipelines +spec: + params: + - name: git-repo-url + description: The git repository URL + - name: git-revision + description: The git revision/commit hash + - name: git-repo-name + description: The git repository name + default: "bakery-ia" + resourcetemplates: + - apiVersion: tekton.dev/v1beta1 + kind: PipelineRun + metadata: + generateName: bakery-ia-ci-run-$(params.git-repo-name)- + spec: + pipelineRef: + name: bakery-ia-ci + workspaces: + - name: shared-workspace + volumeClaimTemplate: + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 5Gi + - name: docker-credentials + secret: + secretName: gitea-registry-credentials + params: + - name: git-url + value: $(params.git-repo-url) + - name: git-revision + value: $(params.git-revision) + - name: registry + value: "gitea.bakery-ia.local:5000" \ No newline at end of file diff --git a/services/orchestrator/app/main.py b/services/orchestrator/app/main.py index 33271ecb..94b5351c 100644 --- a/services/orchestrator/app/main.py +++ b/services/orchestrator/app/main.py @@ -18,6 +18,28 @@ class OrchestratorService(StandardFastAPIService): expected_migration_version = "001_initial_schema" + def __init__(self): + # Define expected database tables for health checks + orchestrator_expected_tables = [ + 'orchestration_runs' + ] + + self.rabbitmq_client = None + self.event_publisher = None + self.leader_election = None + self.scheduler_service = None + + super().__init__( + service_name="orchestrator-service", + app_name=settings.APP_NAME, + description=settings.DESCRIPTION, + version=settings.VERSION, + api_prefix="", # Empty because RouteBuilder already includes /api/v1 + database_manager=database_manager, + expected_tables=orchestrator_expected_tables, + enable_messaging=True # Enable RabbitMQ for event publishing + ) + async def verify_migrations(self): """Verify database schema matches the latest migrations""" try: @@ -32,26 +54,6 @@ class OrchestratorService(StandardFastAPIService): self.logger.error(f"Migration verification failed: {e}") raise - def __init__(self): - # Define expected database tables for health checks - orchestrator_expected_tables = [ - 'orchestration_runs' - ] - - self.rabbitmq_client = None - self.event_publisher = None - - super().__init__( - service_name="orchestrator-service", - app_name=settings.APP_NAME, - description=settings.DESCRIPTION, - version=settings.VERSION, - api_prefix="", # Empty because RouteBuilder already includes /api/v1 - database_manager=database_manager, - expected_tables=orchestrator_expected_tables, - enable_messaging=True # Enable RabbitMQ for event publishing - ) - async def _setup_messaging(self): """Setup messaging for orchestrator service""" from shared.messaging import UnifiedEventPublisher, RabbitMQClient @@ -84,22 +86,91 @@ class OrchestratorService(StandardFastAPIService): self.logger.info("Orchestrator Service starting up...") - # Initialize orchestrator scheduler service with EventPublisher - from app.services.orchestrator_service import OrchestratorSchedulerService - scheduler_service = OrchestratorSchedulerService(self.event_publisher, settings) - await scheduler_service.start() - app.state.scheduler_service = scheduler_service - self.logger.info("Orchestrator scheduler service started") + # Initialize leader election for horizontal scaling + # Only the leader pod will run the scheduler + await self._setup_leader_election(app) # REMOVED: Delivery tracking service - moved to procurement service (domain ownership) + async def _setup_leader_election(self, app: FastAPI): + """ + Setup leader election for scheduler. + + CRITICAL FOR HORIZONTAL SCALING: + Without leader election, each pod would run the same scheduled jobs, + causing duplicate forecasts, production schedules, and database contention. + """ + from shared.leader_election import LeaderElectionService + import redis.asyncio as redis + + try: + # Create Redis connection for leader election + redis_url = f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}" + if settings.REDIS_TLS_ENABLED.lower() == "true": + redis_url = redis_url.replace("redis://", "rediss://") + + redis_client = redis.from_url(redis_url, decode_responses=False) + await redis_client.ping() + + # Use shared leader election service + self.leader_election = LeaderElectionService( + redis_client, + service_name="orchestrator" + ) + + # Define callbacks for leader state changes + async def on_become_leader(): + self.logger.info("This pod became the leader - starting scheduler") + from app.services.orchestrator_service import OrchestratorSchedulerService + self.scheduler_service = OrchestratorSchedulerService(self.event_publisher, settings) + await self.scheduler_service.start() + app.state.scheduler_service = self.scheduler_service + self.logger.info("Orchestrator scheduler service started (leader only)") + + async def on_lose_leader(): + self.logger.warning("This pod lost leadership - stopping scheduler") + if self.scheduler_service: + await self.scheduler_service.stop() + self.scheduler_service = None + if hasattr(app.state, 'scheduler_service'): + app.state.scheduler_service = None + self.logger.info("Orchestrator scheduler service stopped (no longer leader)") + + # Start leader election + await self.leader_election.start( + on_become_leader=on_become_leader, + on_lose_leader=on_lose_leader + ) + + # Store leader election in app state for health checks + app.state.leader_election = self.leader_election + + self.logger.info("Leader election initialized", + is_leader=self.leader_election.is_leader, + instance_id=self.leader_election.instance_id) + + except Exception as e: + self.logger.error("Failed to setup leader election, falling back to standalone mode", + error=str(e)) + # Fallback: start scheduler anyway (for single-pod deployments) + from app.services.orchestrator_service import OrchestratorSchedulerService + self.scheduler_service = OrchestratorSchedulerService(self.event_publisher, settings) + await self.scheduler_service.start() + app.state.scheduler_service = self.scheduler_service + self.logger.warning("Scheduler started in standalone mode (no leader election)") + async def on_shutdown(self, app: FastAPI): """Custom shutdown logic for orchestrator service""" self.logger.info("Orchestrator Service shutting down...") - # Stop scheduler service - if hasattr(app.state, 'scheduler_service'): - await app.state.scheduler_service.stop() + # Stop leader election (this will also stop scheduler if we're the leader) + if self.leader_election: + await self.leader_election.stop() + self.logger.info("Leader election stopped") + + # Stop scheduler service if still running + if self.scheduler_service: + await self.scheduler_service.stop() self.logger.info("Orchestrator scheduler service stopped") diff --git a/services/procurement/app/services/delivery_tracking_service.py b/services/procurement/app/services/delivery_tracking_service.py index 32fe8167..fa56a730 100644 --- a/services/procurement/app/services/delivery_tracking_service.py +++ b/services/procurement/app/services/delivery_tracking_service.py @@ -1,12 +1,12 @@ """ -Delivery Tracking Service - Simplified +Delivery Tracking Service - With Leader Election Tracks purchase order deliveries and generates appropriate alerts using EventPublisher: - DELIVERY_ARRIVING_SOON: 2 hours before delivery window - DELIVERY_OVERDUE: 30 minutes after expected delivery time - STOCK_RECEIPT_INCOMPLETE: If delivery not marked as received -Runs as internal scheduler with leader election. +Runs as internal scheduler with leader election for horizontal scaling. Domain ownership: Procurement service owns all PO and delivery tracking. """ @@ -30,7 +30,7 @@ class DeliveryTrackingService: Monitors PO deliveries and generates time-based alerts using EventPublisher. Uses APScheduler with leader election to run hourly checks. - Only one pod executes checks (others skip if not leader). + Only one pod executes checks - leader election ensures no duplicate alerts. """ def __init__(self, event_publisher: UnifiedEventPublisher, config, database_manager=None): @@ -38,46 +38,121 @@ class DeliveryTrackingService: self.config = config self.database_manager = database_manager self.scheduler = AsyncIOScheduler() - self.is_leader = False + self._leader_election = None + self._redis_client = None + self._scheduler_started = False self.instance_id = str(uuid4())[:8] # Short instance ID for logging async def start(self): - """Start the delivery tracking scheduler""" - # Initialize and start scheduler if not already running + """Start the delivery tracking scheduler with leader election""" + try: + # Initialize leader election + await self._setup_leader_election() + except Exception as e: + logger.error("Failed to setup leader election, starting in standalone mode", + error=str(e)) + # Fallback: start scheduler without leader election + await self._start_scheduler() + + async def _setup_leader_election(self): + """Setup Redis-based leader election for horizontal scaling""" + from shared.leader_election import LeaderElectionService + import redis.asyncio as redis + + # Build Redis URL from config + redis_url = getattr(self.config, 'REDIS_URL', None) + if not redis_url: + redis_password = getattr(self.config, 'REDIS_PASSWORD', '') + redis_host = getattr(self.config, 'REDIS_HOST', 'localhost') + redis_port = getattr(self.config, 'REDIS_PORT', 6379) + redis_db = getattr(self.config, 'REDIS_DB', 0) + redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}" + + self._redis_client = redis.from_url(redis_url, decode_responses=False) + await self._redis_client.ping() + + # Create leader election service + self._leader_election = LeaderElectionService( + self._redis_client, + service_name="procurement-delivery-tracking" + ) + + # Start leader election with callbacks + await self._leader_election.start( + on_become_leader=self._on_become_leader, + on_lose_leader=self._on_lose_leader + ) + + logger.info("Leader election initialized for delivery tracking", + is_leader=self._leader_election.is_leader, + instance_id=self.instance_id) + + async def _on_become_leader(self): + """Called when this instance becomes the leader""" + logger.info("Became leader for delivery tracking - starting scheduler", + instance_id=self.instance_id) + await self._start_scheduler() + + async def _on_lose_leader(self): + """Called when this instance loses leadership""" + logger.warning("Lost leadership for delivery tracking - stopping scheduler", + instance_id=self.instance_id) + await self._stop_scheduler() + + async def _start_scheduler(self): + """Start the APScheduler with delivery tracking jobs""" + if self._scheduler_started: + logger.debug("Scheduler already started", instance_id=self.instance_id) + return + if not self.scheduler.running: # Add hourly job to check deliveries self.scheduler.add_job( self._check_all_tenants, - trigger=CronTrigger(minute=30), # Run every hour at :30 (00:30, 01:30, 02:30, etc.) + trigger=CronTrigger(minute=30), # Run every hour at :30 id='hourly_delivery_check', name='Hourly Delivery Tracking', replace_existing=True, - max_instances=1, # Ensure no overlapping runs - coalesce=True # Combine missed runs + max_instances=1, + coalesce=True ) self.scheduler.start() + self._scheduler_started = True - # Log next run time next_run = self.scheduler.get_job('hourly_delivery_check').next_run_time - logger.info( - "Delivery tracking scheduler started with hourly checks", - instance_id=self.instance_id, - next_run=next_run.isoformat() if next_run else None - ) - else: - logger.info( - "Delivery tracking scheduler already running", - instance_id=self.instance_id - ) + logger.info("Delivery tracking scheduler started", + instance_id=self.instance_id, + next_run=next_run.isoformat() if next_run else None) + + async def _stop_scheduler(self): + """Stop the APScheduler""" + if not self._scheduler_started: + return + + if self.scheduler.running: + self.scheduler.shutdown(wait=False) + self._scheduler_started = False + logger.info("Delivery tracking scheduler stopped", instance_id=self.instance_id) async def stop(self): - """Stop the scheduler and release leader lock""" - if self.scheduler.running: - self.scheduler.shutdown(wait=True) # Graceful shutdown - logger.info("Delivery tracking scheduler stopped", instance_id=self.instance_id) - else: - logger.info("Delivery tracking scheduler already stopped", instance_id=self.instance_id) + """Stop the scheduler and leader election""" + # Stop leader election first + if self._leader_election: + await self._leader_election.stop() + logger.info("Leader election stopped", instance_id=self.instance_id) + + # Stop scheduler + await self._stop_scheduler() + + # Close Redis + if self._redis_client: + await self._redis_client.close() + + @property + def is_leader(self) -> bool: + """Check if this instance is the leader""" + return self._leader_election.is_leader if self._leader_election else True async def _check_all_tenants(self): """ diff --git a/services/training/app/main.py b/services/training/app/main.py index 89e76c3b..f40497dd 100644 --- a/services/training/app/main.py +++ b/services/training/app/main.py @@ -46,6 +46,9 @@ class TrainingService(StandardFastAPIService): await setup_messaging() self.logger.info("Messaging setup completed") + # Initialize Redis pub/sub for cross-pod WebSocket broadcasting + await self._setup_websocket_redis() + # Set up WebSocket event consumer (listens to RabbitMQ and broadcasts to WebSockets) success = await setup_websocket_event_consumer() if success: @@ -53,8 +56,44 @@ class TrainingService(StandardFastAPIService): else: self.logger.warning("WebSocket event consumer setup failed") + async def _setup_websocket_redis(self): + """ + Initialize Redis pub/sub for WebSocket cross-pod broadcasting. + + CRITICAL FOR HORIZONTAL SCALING: + Without this, WebSocket clients on Pod A won't receive events + from training jobs running on Pod B. + """ + try: + from app.websocket.manager import websocket_manager + from app.core.config import settings + + redis_url = settings.REDIS_URL + success = await websocket_manager.initialize_redis(redis_url) + + if success: + self.logger.info("WebSocket Redis pub/sub initialized for horizontal scaling") + else: + self.logger.warning( + "WebSocket Redis pub/sub failed to initialize. " + "WebSocket events will only be delivered to local connections." + ) + + except Exception as e: + self.logger.error("Failed to setup WebSocket Redis pub/sub", + error=str(e)) + # Don't fail startup - WebSockets will work locally without Redis + async def _cleanup_messaging(self): """Cleanup messaging for training service""" + # Shutdown WebSocket Redis pub/sub + try: + from app.websocket.manager import websocket_manager + await websocket_manager.shutdown() + self.logger.info("WebSocket Redis pub/sub shutdown completed") + except Exception as e: + self.logger.warning("Error shutting down WebSocket Redis", error=str(e)) + await cleanup_websocket_consumers() await cleanup_messaging() @@ -78,13 +117,49 @@ class TrainingService(StandardFastAPIService): async def on_startup(self, app: FastAPI): """Custom startup logic including migration verification""" await self.verify_migrations() - + # Initialize system metrics collection system_metrics = SystemMetricsCollector("training") self.logger.info("System metrics collection started") - + + # Recover stale jobs from previous pod crashes + # This is important for horizontal scaling - jobs may be left in 'running' + # state if a pod crashes. We mark them as failed so they can be retried. + await self._recover_stale_jobs() + self.logger.info("Training service startup completed") + async def _recover_stale_jobs(self): + """ + Recover stale training jobs on startup. + + When a pod crashes mid-training, jobs are left in 'running' or 'pending' state. + This method finds jobs that haven't been updated in a while and marks them + as failed so users can retry them. + """ + try: + from app.repositories.training_log_repository import TrainingLogRepository + + async with self.database_manager.get_session() as session: + log_repo = TrainingLogRepository(session) + + # Recover jobs that haven't been updated in 60 minutes + # This is conservative - most training jobs complete within 30 minutes + recovered = await log_repo.recover_stale_jobs(stale_threshold_minutes=60) + + if recovered: + self.logger.warning( + "Recovered stale training jobs on startup", + recovered_count=len(recovered), + job_ids=[j.job_id for j in recovered] + ) + else: + self.logger.info("No stale training jobs to recover") + + except Exception as e: + # Don't fail startup if recovery fails - just log the error + self.logger.error("Failed to recover stale jobs on startup", error=str(e)) + async def on_shutdown(self, app: FastAPI): """Custom shutdown logic for training service""" await cleanup_training_database() diff --git a/services/training/app/repositories/training_log_repository.py b/services/training/app/repositories/training_log_repository.py index eee6aef3..7bae3098 100644 --- a/services/training/app/repositories/training_log_repository.py +++ b/services/training/app/repositories/training_log_repository.py @@ -342,4 +342,166 @@ class TrainingLogRepository(TrainingBaseRepository): logger.error("Failed to get start time", job_id=job_id, error=str(e)) - return None \ No newline at end of file + return None + + async def create_job_atomic( + self, + job_id: str, + tenant_id: str, + config: Dict[str, Any] = None + ) -> tuple[Optional[ModelTrainingLog], bool]: + """ + Atomically create a training job, respecting the unique constraint. + + This method uses INSERT ... ON CONFLICT to handle race conditions + when multiple pods try to create a job for the same tenant simultaneously. + The database constraint (idx_unique_active_training_per_tenant) ensures + only one active job per tenant can exist. + + Args: + job_id: Unique job identifier + tenant_id: Tenant identifier + config: Optional job configuration + + Returns: + Tuple of (job, created): + - If created: (new_job, True) + - If conflict (existing active job): (existing_job, False) + - If error: raises DatabaseError + """ + try: + # First, try to find an existing active job + existing = await self.get_active_jobs(tenant_id=tenant_id) + pending = await self.get_logs_by_tenant(tenant_id=tenant_id, status="pending", limit=1) + + if existing or pending: + # Return existing job + active_job = existing[0] if existing else pending[0] + logger.info("Found existing active job, skipping creation", + existing_job_id=active_job.job_id, + tenant_id=tenant_id, + requested_job_id=job_id) + return (active_job, False) + + # Try to create the new job + # If another pod created one in the meantime, the unique constraint will prevent this + log_data = { + "job_id": job_id, + "tenant_id": tenant_id, + "status": "pending", + "progress": 0, + "current_step": "initializing", + "config": config or {} + } + + try: + new_job = await self.create_training_log(log_data) + await self.session.commit() + logger.info("Created new training job atomically", + job_id=job_id, + tenant_id=tenant_id) + return (new_job, True) + except Exception as create_error: + error_str = str(create_error).lower() + # Check if this is a unique constraint violation + if "unique" in error_str or "duplicate" in error_str or "constraint" in error_str: + await self.session.rollback() + # Another pod created a job, fetch it + logger.info("Unique constraint hit, fetching existing job", + tenant_id=tenant_id, + requested_job_id=job_id) + existing = await self.get_active_jobs(tenant_id=tenant_id) + pending = await self.get_logs_by_tenant(tenant_id=tenant_id, status="pending", limit=1) + if existing or pending: + active_job = existing[0] if existing else pending[0] + return (active_job, False) + # If still no job found, something went wrong + raise DatabaseError(f"Constraint violation but no active job found: {create_error}") + else: + raise + + except DatabaseError: + raise + except Exception as e: + logger.error("Failed to create job atomically", + job_id=job_id, + tenant_id=tenant_id, + error=str(e)) + raise DatabaseError(f"Failed to create training job atomically: {str(e)}") + + async def recover_stale_jobs(self, stale_threshold_minutes: int = 60) -> List[ModelTrainingLog]: + """ + Find and mark stale running jobs as failed. + + This is used during service startup to clean up jobs that were + running when a pod crashed. With multiple replicas, only stale + jobs (not updated recently) should be marked as failed. + + Args: + stale_threshold_minutes: Jobs not updated for this long are considered stale + + Returns: + List of jobs that were marked as failed + """ + try: + stale_cutoff = datetime.now() - timedelta(minutes=stale_threshold_minutes) + + # Find running jobs that haven't been updated recently + query = text(""" + SELECT id, job_id, tenant_id, status, updated_at + FROM model_training_logs + WHERE status IN ('running', 'pending') + AND updated_at < :stale_cutoff + """) + + result = await self.session.execute(query, {"stale_cutoff": stale_cutoff}) + stale_jobs = result.fetchall() + + recovered_jobs = [] + for row in stale_jobs: + try: + # Mark as failed + update_query = text(""" + UPDATE model_training_logs + SET status = 'failed', + error_message = :error_msg, + end_time = :end_time, + updated_at = :updated_at + WHERE id = :id AND status IN ('running', 'pending') + """) + + await self.session.execute(update_query, { + "id": row.id, + "error_msg": f"Job recovered as failed - not updated since {row.updated_at.isoformat()}. Pod may have crashed.", + "end_time": datetime.now(), + "updated_at": datetime.now() + }) + + logger.warning("Recovered stale training job", + job_id=row.job_id, + tenant_id=str(row.tenant_id), + last_updated=row.updated_at.isoformat() if row.updated_at else "unknown") + + # Fetch the updated job to return + job = await self.get_by_job_id(row.job_id) + if job: + recovered_jobs.append(job) + + except Exception as job_error: + logger.error("Failed to recover individual stale job", + job_id=row.job_id, + error=str(job_error)) + + if recovered_jobs: + await self.session.commit() + logger.info("Stale job recovery completed", + recovered_count=len(recovered_jobs), + stale_threshold_minutes=stale_threshold_minutes) + + return recovered_jobs + + except Exception as e: + logger.error("Failed to recover stale jobs", + error=str(e)) + await self.session.rollback() + return [] \ No newline at end of file diff --git a/services/training/app/utils/distributed_lock.py b/services/training/app/utils/distributed_lock.py index c167b006..c63084db 100644 --- a/services/training/app/utils/distributed_lock.py +++ b/services/training/app/utils/distributed_lock.py @@ -1,10 +1,16 @@ """ Distributed Locking Mechanisms Prevents concurrent training jobs for the same product + +HORIZONTAL SCALING FIX: +- Uses SHA256 for stable hash across all Python processes/pods +- Python's built-in hash() varies between processes due to hash randomization (Python 3.3+) +- This ensures all pods compute the same lock ID for the same lock name """ import asyncio import time +import hashlib from typing import Optional import logging from contextlib import asynccontextmanager @@ -39,9 +45,20 @@ class DatabaseLock: self.lock_id = self._hash_lock_name(lock_name) def _hash_lock_name(self, name: str) -> int: - """Convert lock name to integer ID for PostgreSQL advisory lock""" - # Use hash and modulo to get a positive 32-bit integer - return abs(hash(name)) % (2**31) + """ + Convert lock name to integer ID for PostgreSQL advisory lock. + + CRITICAL: Uses SHA256 for stable hash across all Python processes/pods. + Python's built-in hash() varies between processes due to hash randomization + (PYTHONHASHSEED, enabled by default since Python 3.3), which would cause + different pods to compute different lock IDs for the same lock name, + defeating the purpose of distributed locking. + """ + # Use SHA256 for stable, cross-process hash + hash_bytes = hashlib.sha256(name.encode('utf-8')).digest() + # Take first 4 bytes and convert to positive 31-bit integer + # (PostgreSQL advisory locks use bigint, but we use 31-bit for safety) + return int.from_bytes(hash_bytes[:4], 'big') % (2**31) @asynccontextmanager async def acquire(self, session: AsyncSession): diff --git a/services/training/app/websocket/manager.py b/services/training/app/websocket/manager.py index e8e81245..a9a6b89c 100644 --- a/services/training/app/websocket/manager.py +++ b/services/training/app/websocket/manager.py @@ -1,21 +1,39 @@ """ WebSocket Connection Manager for Training Service Manages WebSocket connections and broadcasts RabbitMQ events to connected clients + +HORIZONTAL SCALING: +- Uses Redis pub/sub for cross-pod WebSocket broadcasting +- Each pod subscribes to a Redis channel and broadcasts to its local connections +- Events published to Redis are received by all pods, ensuring clients on any + pod receive events from training jobs running on any other pod """ import asyncio import json -from typing import Dict, Set +import os +from typing import Dict, Optional from fastapi import WebSocket import structlog logger = structlog.get_logger() +# Redis pub/sub channel for WebSocket events +REDIS_WEBSOCKET_CHANNEL = "training:websocket:events" + class WebSocketConnectionManager: """ - Simple WebSocket connection manager. - Manages connections per job_id and broadcasts messages to all connected clients. + WebSocket connection manager with Redis pub/sub for horizontal scaling. + + In a multi-pod deployment: + 1. Events are published to Redis pub/sub (not just local broadcast) + 2. Each pod subscribes to Redis and broadcasts to its local WebSocket connections + 3. This ensures clients connected to any pod receive events from any pod + + Flow: + - RabbitMQ event → Pod A receives → Pod A publishes to Redis + - Redis pub/sub → All pods receive → Each pod broadcasts to local WebSockets """ def __init__(self): @@ -24,6 +42,121 @@ class WebSocketConnectionManager: self._lock = asyncio.Lock() # Store latest event for each job to provide initial state self._latest_events: Dict[str, dict] = {} + # Redis client for pub/sub + self._redis: Optional[object] = None + self._pubsub: Optional[object] = None + self._subscriber_task: Optional[asyncio.Task] = None + self._running = False + self._instance_id = f"{os.environ.get('HOSTNAME', 'unknown')}:{os.getpid()}" + + async def initialize_redis(self, redis_url: str) -> bool: + """ + Initialize Redis connection for cross-pod pub/sub. + + Args: + redis_url: Redis connection URL + + Returns: + True if successful, False otherwise + """ + try: + import redis.asyncio as redis_async + + self._redis = redis_async.from_url(redis_url, decode_responses=True) + await self._redis.ping() + + # Create pub/sub subscriber + self._pubsub = self._redis.pubsub() + await self._pubsub.subscribe(REDIS_WEBSOCKET_CHANNEL) + + # Start subscriber task + self._running = True + self._subscriber_task = asyncio.create_task(self._redis_subscriber_loop()) + + logger.info("Redis pub/sub initialized for WebSocket broadcasting", + instance_id=self._instance_id, + channel=REDIS_WEBSOCKET_CHANNEL) + return True + + except Exception as e: + logger.error("Failed to initialize Redis pub/sub", + error=str(e), + instance_id=self._instance_id) + return False + + async def shutdown(self): + """Shutdown Redis pub/sub connection""" + self._running = False + + if self._subscriber_task: + self._subscriber_task.cancel() + try: + await self._subscriber_task + except asyncio.CancelledError: + pass + + if self._pubsub: + await self._pubsub.unsubscribe(REDIS_WEBSOCKET_CHANNEL) + await self._pubsub.close() + + if self._redis: + await self._redis.close() + + logger.info("Redis pub/sub shutdown complete", + instance_id=self._instance_id) + + async def _redis_subscriber_loop(self): + """Background task to receive Redis pub/sub messages and broadcast locally""" + try: + while self._running: + try: + message = await self._pubsub.get_message( + ignore_subscribe_messages=True, + timeout=1.0 + ) + + if message and message['type'] == 'message': + await self._handle_redis_message(message['data']) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error("Error in Redis subscriber loop", + error=str(e), + instance_id=self._instance_id) + await asyncio.sleep(1) # Backoff on error + + except asyncio.CancelledError: + pass + + logger.info("Redis subscriber loop stopped", + instance_id=self._instance_id) + + async def _handle_redis_message(self, data: str): + """Handle a message received from Redis pub/sub""" + try: + payload = json.loads(data) + job_id = payload.get('job_id') + message = payload.get('message') + source_instance = payload.get('source_instance') + + if not job_id or not message: + return + + # Log cross-pod message + if source_instance != self._instance_id: + logger.debug("Received cross-pod WebSocket event", + job_id=job_id, + source_instance=source_instance, + local_instance=self._instance_id) + + # Broadcast to local WebSocket connections + await self._broadcast_local(job_id, message) + + except json.JSONDecodeError as e: + logger.warning("Invalid JSON in Redis message", error=str(e)) + except Exception as e: + logger.error("Error handling Redis message", error=str(e)) async def connect(self, job_id: str, websocket: WebSocket) -> None: """Register a new WebSocket connection for a job""" @@ -50,7 +183,8 @@ class WebSocketConnectionManager: logger.info("WebSocket connected", job_id=job_id, websocket_id=ws_id, - total_connections=len(self._connections[job_id])) + total_connections=len(self._connections[job_id]), + instance_id=self._instance_id) async def disconnect(self, job_id: str, websocket: WebSocket) -> None: """Remove a WebSocket connection""" @@ -66,19 +200,56 @@ class WebSocketConnectionManager: logger.info("WebSocket disconnected", job_id=job_id, websocket_id=ws_id, - remaining_connections=len(self._connections.get(job_id, {}))) + remaining_connections=len(self._connections.get(job_id, {})), + instance_id=self._instance_id) async def broadcast(self, job_id: str, message: dict) -> int: """ - Broadcast a message to all connections for a specific job. - Returns the number of successful broadcasts. + Broadcast a message to all connections for a specific job across ALL pods. + + If Redis is configured, publishes to Redis pub/sub which then broadcasts + to all pods. Otherwise, falls back to local-only broadcast. + + Returns the number of successful local broadcasts. """ # Store the latest event for this job to provide initial state to new connections - if message.get('type') != 'initial_state': # Don't store initial_state messages + if message.get('type') != 'initial_state': self._latest_events[job_id] = message + # If Redis is available, publish to Redis for cross-pod broadcast + if self._redis: + try: + payload = json.dumps({ + 'job_id': job_id, + 'message': message, + 'source_instance': self._instance_id + }) + await self._redis.publish(REDIS_WEBSOCKET_CHANNEL, payload) + logger.debug("Published WebSocket event to Redis", + job_id=job_id, + message_type=message.get('type'), + instance_id=self._instance_id) + # Return 0 here because the actual broadcast happens via subscriber + # The count will be from _broadcast_local when the message is received + return 0 + except Exception as e: + logger.warning("Failed to publish to Redis, falling back to local broadcast", + error=str(e), + job_id=job_id) + # Fall through to local broadcast + + # Local-only broadcast (when Redis is not available) + return await self._broadcast_local(job_id, message) + + async def _broadcast_local(self, job_id: str, message: dict) -> int: + """ + Broadcast a message to local WebSocket connections only. + This is called either directly (no Redis) or from Redis subscriber. + """ if job_id not in self._connections: - logger.debug("No active connections for job", job_id=job_id) + logger.debug("No active local connections for job", + job_id=job_id, + instance_id=self._instance_id) return 0 connections = list(self._connections[job_id].values()) @@ -103,18 +274,27 @@ class WebSocketConnectionManager: self._connections[job_id].pop(ws_id, None) if successful_sends > 0: - logger.info("Broadcasted message to WebSocket clients", + logger.info("Broadcasted message to local WebSocket clients", job_id=job_id, message_type=message.get('type'), successful_sends=successful_sends, - failed_sends=len(failed_websockets)) + failed_sends=len(failed_websockets), + instance_id=self._instance_id) return successful_sends def get_connection_count(self, job_id: str) -> int: - """Get the number of active connections for a job""" + """Get the number of active local connections for a job""" return len(self._connections.get(job_id, {})) + def get_total_connection_count(self) -> int: + """Get total number of active connections across all jobs""" + return sum(len(conns) for conns in self._connections.values()) + + def is_redis_enabled(self) -> bool: + """Check if Redis pub/sub is enabled""" + return self._redis is not None and self._running + # Global singleton instance websocket_manager = WebSocketConnectionManager() diff --git a/services/training/migrations/versions/add_horizontal_scaling_constraints.py b/services/training/migrations/versions/add_horizontal_scaling_constraints.py new file mode 100644 index 00000000..25cf1696 --- /dev/null +++ b/services/training/migrations/versions/add_horizontal_scaling_constraints.py @@ -0,0 +1,60 @@ +"""Add horizontal scaling constraints for multi-pod deployment + +Revision ID: add_horizontal_scaling +Revises: 26a665cd5348 +Create Date: 2025-01-18 + +This migration adds database-level constraints to prevent race conditions +when running multiple training service pods: + +1. Partial unique index on model_training_logs to prevent duplicate active jobs per tenant +2. Index to speed up active job lookups +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'add_horizontal_scaling' +down_revision: Union[str, None] = '26a665cd5348' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add partial unique index to prevent duplicate active training jobs per tenant + # This ensures only ONE job can be in 'pending' or 'running' status per tenant at a time + # The constraint is enforced at the database level, preventing race conditions + # between multiple pods checking and creating jobs simultaneously + op.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_active_training_per_tenant + ON model_training_logs (tenant_id) + WHERE status IN ('pending', 'running') + """) + + # Add index to speed up active job lookups (used by deduplication check) + op.create_index( + 'idx_training_logs_tenant_status', + 'model_training_logs', + ['tenant_id', 'status'], + unique=False, + if_not_exists=True + ) + + # Add index for job recovery queries (find stale running jobs) + op.create_index( + 'idx_training_logs_status_updated', + 'model_training_logs', + ['status', 'updated_at'], + unique=False, + if_not_exists=True + ) + + +def downgrade() -> None: + # Remove the indexes in reverse order + op.execute("DROP INDEX IF EXISTS idx_training_logs_status_updated") + op.execute("DROP INDEX IF EXISTS idx_training_logs_tenant_status") + op.execute("DROP INDEX IF EXISTS idx_unique_active_training_per_tenant") diff --git a/shared/leader_election/__init__.py b/shared/leader_election/__init__.py new file mode 100644 index 00000000..014a4a39 --- /dev/null +++ b/shared/leader_election/__init__.py @@ -0,0 +1,33 @@ +""" +Shared Leader Election for Bakery-IA platform + +Provides Redis-based leader election for services that need to run +singleton scheduled tasks (APScheduler, background jobs, etc.) + +Usage: + from shared.leader_election import LeaderElectionService, SchedulerLeaderMixin + + # Option 1: Direct usage + leader_election = LeaderElectionService(redis_client, "my-service") + await leader_election.start( + on_become_leader=start_scheduler, + on_lose_leader=stop_scheduler + ) + + # Option 2: Mixin for services with APScheduler + class MySchedulerService(SchedulerLeaderMixin): + async def _create_scheduler_jobs(self): + self.scheduler.add_job(...) +""" + +from shared.leader_election.service import ( + LeaderElectionService, + LeaderElectionConfig, +) +from shared.leader_election.mixin import SchedulerLeaderMixin + +__all__ = [ + "LeaderElectionService", + "LeaderElectionConfig", + "SchedulerLeaderMixin", +] diff --git a/shared/leader_election/mixin.py b/shared/leader_election/mixin.py new file mode 100644 index 00000000..52c1555d --- /dev/null +++ b/shared/leader_election/mixin.py @@ -0,0 +1,209 @@ +""" +Scheduler Leader Mixin + +Provides a mixin class for services that use APScheduler and need +leader election for horizontal scaling. + +Usage: + class MySchedulerService(SchedulerLeaderMixin): + def __init__(self, redis_url: str, service_name: str): + super().__init__(redis_url, service_name) + # Your initialization here + + async def _create_scheduler_jobs(self): + '''Override to define your scheduled jobs''' + self.scheduler.add_job( + self.my_job, + trigger=CronTrigger(hour=0), + id='my_job' + ) + + async def my_job(self): + # Your job logic here + pass +""" + +import asyncio +from typing import Optional +from abc import abstractmethod +import structlog + +logger = structlog.get_logger() + + +class SchedulerLeaderMixin: + """ + Mixin for services that use APScheduler with leader election. + + Provides automatic leader election and scheduler management. + Only the leader pod will run scheduled jobs. + """ + + def __init__(self, redis_url: str, service_name: str, **kwargs): + """ + Initialize the scheduler with leader election. + + Args: + redis_url: Redis connection URL for leader election + service_name: Unique service name for leader election lock + **kwargs: Additional arguments passed to parent class + """ + super().__init__(**kwargs) + + self._redis_url = redis_url + self._service_name = service_name + self._leader_election = None + self._redis_client = None + self.scheduler = None + self._scheduler_started = False + + async def start_with_leader_election(self): + """ + Start the service with leader election. + + Only the leader will start the scheduler. + """ + from apscheduler.schedulers.asyncio import AsyncIOScheduler + from shared.leader_election.service import LeaderElectionService + import redis.asyncio as redis + + try: + # Create Redis connection + self._redis_client = redis.from_url(self._redis_url, decode_responses=False) + await self._redis_client.ping() + + # Create scheduler (but don't start it yet) + self.scheduler = AsyncIOScheduler() + + # Create leader election + self._leader_election = LeaderElectionService( + self._redis_client, + self._service_name + ) + + # Start leader election with callbacks + await self._leader_election.start( + on_become_leader=self._on_become_leader, + on_lose_leader=self._on_lose_leader + ) + + logger.info("Scheduler service started with leader election", + service=self._service_name, + is_leader=self._leader_election.is_leader, + instance_id=self._leader_election.instance_id) + + except Exception as e: + logger.error("Failed to start with leader election, falling back to standalone", + service=self._service_name, + error=str(e)) + # Fallback: start scheduler anyway (for single-pod deployments) + await self._start_scheduler_standalone() + + async def _on_become_leader(self): + """Called when this instance becomes the leader""" + logger.info("Became leader, starting scheduler", + service=self._service_name) + await self._start_scheduler() + + async def _on_lose_leader(self): + """Called when this instance loses leadership""" + logger.warning("Lost leadership, stopping scheduler", + service=self._service_name) + await self._stop_scheduler() + + async def _start_scheduler(self): + """Start the scheduler with defined jobs""" + if self._scheduler_started: + logger.warning("Scheduler already started", + service=self._service_name) + return + + try: + # Let subclass define jobs + await self._create_scheduler_jobs() + + # Start scheduler + if not self.scheduler.running: + self.scheduler.start() + self._scheduler_started = True + logger.info("Scheduler started", + service=self._service_name, + job_count=len(self.scheduler.get_jobs())) + + except Exception as e: + logger.error("Failed to start scheduler", + service=self._service_name, + error=str(e)) + + async def _stop_scheduler(self): + """Stop the scheduler""" + if not self._scheduler_started: + return + + try: + if self.scheduler and self.scheduler.running: + self.scheduler.shutdown(wait=False) + self._scheduler_started = False + logger.info("Scheduler stopped", + service=self._service_name) + + except Exception as e: + logger.error("Failed to stop scheduler", + service=self._service_name, + error=str(e)) + + async def _start_scheduler_standalone(self): + """Start scheduler without leader election (fallback mode)""" + from apscheduler.schedulers.asyncio import AsyncIOScheduler + + logger.warning("Starting scheduler in standalone mode (no leader election)", + service=self._service_name) + + self.scheduler = AsyncIOScheduler() + await self._create_scheduler_jobs() + + if not self.scheduler.running: + self.scheduler.start() + self._scheduler_started = True + + @abstractmethod + async def _create_scheduler_jobs(self): + """ + Override to define scheduled jobs. + + Example: + self.scheduler.add_job( + self.my_task, + trigger=CronTrigger(hour=0, minute=30), + id='my_task', + max_instances=1 + ) + """ + pass + + async def stop(self): + """Stop the scheduler and leader election""" + # Stop leader election + if self._leader_election: + await self._leader_election.stop() + + # Stop scheduler + await self._stop_scheduler() + + # Close Redis + if self._redis_client: + await self._redis_client.close() + + logger.info("Scheduler service stopped", + service=self._service_name) + + @property + def is_leader(self) -> bool: + """Check if this instance is the leader""" + return self._leader_election.is_leader if self._leader_election else False + + def get_leader_status(self) -> dict: + """Get leader election status""" + if self._leader_election: + return self._leader_election.get_status() + return {"is_leader": True, "mode": "standalone"} diff --git a/shared/leader_election/service.py b/shared/leader_election/service.py new file mode 100644 index 00000000..cf6d49db --- /dev/null +++ b/shared/leader_election/service.py @@ -0,0 +1,352 @@ +""" +Leader Election Service + +Implements Redis-based leader election to ensure only ONE pod runs +singleton tasks like APScheduler jobs. + +This is CRITICAL for horizontal scaling - without leader election, +each pod would run the same scheduled jobs, causing: +- Duplicate operations (forecasts, alerts, syncs) +- Database contention +- Inconsistent state +- Duplicate notifications + +Implementation: +- Uses Redis SET NX (set if not exists) for atomic leadership acquisition +- Leader maintains leadership with periodic heartbeats +- If leader fails to heartbeat, another pod can take over +- Non-leader pods check periodically if they should become leader +""" + +import asyncio +import os +import socket +from dataclasses import dataclass +from typing import Optional, Callable, Awaitable +import structlog + +logger = structlog.get_logger() + + +@dataclass +class LeaderElectionConfig: + """Configuration for leader election""" + # Redis key prefix for the lock + lock_key_prefix: str = "leader" + # Lock expires after this many seconds without refresh + lock_ttl_seconds: int = 30 + # Refresh lock every N seconds (should be < lock_ttl_seconds / 2) + heartbeat_interval_seconds: int = 10 + # Non-leaders check for leadership every N seconds + election_check_interval_seconds: int = 15 + + +class LeaderElectionService: + """ + Redis-based leader election service. + + Ensures only one pod runs scheduled tasks at a time across all replicas. + """ + + def __init__( + self, + redis_client, + service_name: str, + config: Optional[LeaderElectionConfig] = None + ): + """ + Initialize leader election service. + + Args: + redis_client: Async Redis client instance + service_name: Unique name for this service (used in Redis key) + config: Optional configuration override + """ + self.redis = redis_client + self.service_name = service_name + self.config = config or LeaderElectionConfig() + self.lock_key = f"{self.config.lock_key_prefix}:{service_name}:lock" + self.instance_id = self._generate_instance_id() + self.is_leader = False + self._heartbeat_task: Optional[asyncio.Task] = None + self._election_task: Optional[asyncio.Task] = None + self._running = False + self._on_become_leader_callback: Optional[Callable[[], Awaitable[None]]] = None + self._on_lose_leader_callback: Optional[Callable[[], Awaitable[None]]] = None + + def _generate_instance_id(self) -> str: + """Generate unique instance identifier for this pod""" + hostname = os.environ.get('HOSTNAME', socket.gethostname()) + pod_ip = os.environ.get('POD_IP', 'unknown') + return f"{hostname}:{pod_ip}:{os.getpid()}" + + async def start( + self, + on_become_leader: Optional[Callable[[], Awaitable[None]]] = None, + on_lose_leader: Optional[Callable[[], Awaitable[None]]] = None + ): + """ + Start leader election process. + + Args: + on_become_leader: Async callback when this instance becomes leader + on_lose_leader: Async callback when this instance loses leadership + """ + self._on_become_leader_callback = on_become_leader + self._on_lose_leader_callback = on_lose_leader + self._running = True + + logger.info("Starting leader election", + service=self.service_name, + instance_id=self.instance_id, + lock_key=self.lock_key) + + # Try to become leader immediately + await self._try_become_leader() + + # Start background tasks + if self.is_leader: + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + else: + self._election_task = asyncio.create_task(self._election_loop()) + + async def stop(self): + """Stop leader election and release leadership if held""" + self._running = False + + # Cancel background tasks + if self._heartbeat_task: + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except asyncio.CancelledError: + pass + self._heartbeat_task = None + + if self._election_task: + self._election_task.cancel() + try: + await self._election_task + except asyncio.CancelledError: + pass + self._election_task = None + + # Release leadership + if self.is_leader: + await self._release_leadership() + + logger.info("Leader election stopped", + service=self.service_name, + instance_id=self.instance_id, + was_leader=self.is_leader) + + async def _try_become_leader(self) -> bool: + """ + Attempt to become the leader. + + Returns: + True if this instance is now the leader + """ + try: + # Try to set the lock with NX (only if not exists) and EX (expiry) + acquired = await self.redis.set( + self.lock_key, + self.instance_id, + nx=True, # Only set if not exists + ex=self.config.lock_ttl_seconds + ) + + if acquired: + self.is_leader = True + logger.info("Became leader", + service=self.service_name, + instance_id=self.instance_id) + + # Call callback + if self._on_become_leader_callback: + try: + await self._on_become_leader_callback() + except Exception as e: + logger.error("Error in on_become_leader callback", + service=self.service_name, + error=str(e)) + + return True + + # Check if we're already the leader (reconnection scenario) + current_leader = await self.redis.get(self.lock_key) + if current_leader: + current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader + if current_leader_str == self.instance_id: + self.is_leader = True + logger.info("Confirmed as existing leader", + service=self.service_name, + instance_id=self.instance_id) + return True + else: + logger.debug("Another instance is leader", + service=self.service_name, + current_leader=current_leader_str, + this_instance=self.instance_id) + + return False + + except Exception as e: + logger.error("Failed to acquire leadership", + service=self.service_name, + instance_id=self.instance_id, + error=str(e)) + return False + + async def _release_leadership(self): + """Release leadership lock""" + try: + # Only delete if we're the current leader + current_leader = await self.redis.get(self.lock_key) + if current_leader: + current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader + if current_leader_str == self.instance_id: + await self.redis.delete(self.lock_key) + logger.info("Released leadership", + service=self.service_name, + instance_id=self.instance_id) + + was_leader = self.is_leader + self.is_leader = False + + # Call callback only if we were the leader + if was_leader and self._on_lose_leader_callback: + try: + await self._on_lose_leader_callback() + except Exception as e: + logger.error("Error in on_lose_leader callback", + service=self.service_name, + error=str(e)) + + except Exception as e: + logger.error("Failed to release leadership", + service=self.service_name, + instance_id=self.instance_id, + error=str(e)) + + async def _refresh_leadership(self) -> bool: + """ + Refresh leadership lock TTL. + + Returns: + True if leadership was maintained + """ + try: + # Verify we're still the leader + current_leader = await self.redis.get(self.lock_key) + if not current_leader: + logger.warning("Lost leadership (lock expired)", + service=self.service_name, + instance_id=self.instance_id) + return False + + current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else current_leader + if current_leader_str != self.instance_id: + logger.warning("Lost leadership (lock held by another instance)", + service=self.service_name, + instance_id=self.instance_id, + current_leader=current_leader_str) + return False + + # Refresh the TTL + await self.redis.expire(self.lock_key, self.config.lock_ttl_seconds) + return True + + except Exception as e: + logger.error("Failed to refresh leadership", + service=self.service_name, + instance_id=self.instance_id, + error=str(e)) + return False + + async def _heartbeat_loop(self): + """Background loop to maintain leadership""" + while self._running and self.is_leader: + try: + await asyncio.sleep(self.config.heartbeat_interval_seconds) + + if not self._running: + break + + maintained = await self._refresh_leadership() + + if not maintained: + self.is_leader = False + + # Call callback + if self._on_lose_leader_callback: + try: + await self._on_lose_leader_callback() + except Exception as e: + logger.error("Error in on_lose_leader callback", + service=self.service_name, + error=str(e)) + + # Switch to election loop + self._election_task = asyncio.create_task(self._election_loop()) + break + + except asyncio.CancelledError: + break + except Exception as e: + logger.error("Error in heartbeat loop", + service=self.service_name, + instance_id=self.instance_id, + error=str(e)) + + async def _election_loop(self): + """Background loop to attempt leadership acquisition""" + while self._running and not self.is_leader: + try: + await asyncio.sleep(self.config.election_check_interval_seconds) + + if not self._running: + break + + acquired = await self._try_become_leader() + + if acquired: + # Switch to heartbeat loop + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + break + + except asyncio.CancelledError: + break + except Exception as e: + logger.error("Error in election loop", + service=self.service_name, + instance_id=self.instance_id, + error=str(e)) + + def get_status(self) -> dict: + """Get current leader election status""" + return { + "service": self.service_name, + "instance_id": self.instance_id, + "is_leader": self.is_leader, + "running": self._running, + "lock_key": self.lock_key, + "config": { + "lock_ttl_seconds": self.config.lock_ttl_seconds, + "heartbeat_interval_seconds": self.config.heartbeat_interval_seconds, + "election_check_interval_seconds": self.config.election_check_interval_seconds + } + } + + async def get_current_leader(self) -> Optional[str]: + """Get the current leader instance ID (if any)""" + try: + current_leader = await self.redis.get(self.lock_key) + if current_leader: + return current_leader.decode() if isinstance(current_leader, bytes) else current_leader + return None + except Exception as e: + logger.error("Failed to get current leader", + service=self.service_name, + error=str(e)) + return None