diff --git a/online-feature-store/dev-toggle.sh b/dev-toggle-go.sh similarity index 52% rename from online-feature-store/dev-toggle.sh rename to dev-toggle-go.sh index 1d513130..aad14f52 100755 --- a/online-feature-store/dev-toggle.sh +++ b/dev-toggle-go.sh @@ -2,14 +2,8 @@ set -e +# Get the script directory (parent directory where this script lives) SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -STATE_FILE="$SCRIPT_DIR/.dev-toggle-state" -GO_MOD_FILE="$SCRIPT_DIR/go.mod" -GO_MOD_APPEND_FILE="$SCRIPT_DIR/.go.mod.appended" - -INTERNAL_REPO_URL="https://github.com/Meesho/BharatMLStack-internal-configs" -INTERNAL_REPO_DIR="$SCRIPT_DIR/.internal-configs" -INTERNAL_OFS_DIR="$INTERNAL_REPO_DIR/online-feature-store" # Colors for output RED='\033[0;31m' @@ -18,14 +12,168 @@ YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' # No Color +get_available_folders() { + local folders=() + for dir in "$SCRIPT_DIR"/*; do + if [ -d "$dir" ] && [ -f "$dir/go.mod" ]; then + local folder_name=$(basename "$dir") + folders+=("$folder_name") + fi + done + echo "${folders[@]}" +} + +interactive_select_folder() { + local available_folders=($(get_available_folders)) + + if [ ${#available_folders[@]} -eq 0 ]; then + log_error "No folders with go.mod found in $SCRIPT_DIR" + exit 1 + fi + + echo "" >&2 + echo "==========================================" >&2 + echo " Step 1: Select Folder(s)" >&2 + echo "==========================================" >&2 + echo "" >&2 + echo "Available folders:" >&2 + local index=1 + for folder in "${available_folders[@]}"; do + echo " [$index] $folder" >&2 + ((index++)) + done + echo "" >&2 + echo "You can select:" >&2 + echo " • Single folder: Enter a number (e.g., 1)" >&2 + echo " • Multiple folders: Enter numbers separated by comma or space (e.g., 1,2 or 1 2 3)" >&2 + echo "" >&2 + + while true; do + read -p "Select folder(s): " selection + + # Remove any extra spaces + selection=$(echo "$selection" | tr -s ' ' | tr ',' ' ') + + # Validate all selections + local valid=true + local selected_folders=() + local IFS=' ' + for num in $selection; do + if [[ "$num" =~ ^[0-9]+$ ]] && [ "$num" -ge 1 ] && [ "$num" -le ${#available_folders[@]} ]; then + selected_folders+=("${available_folders[$((num-1))]}") + else + valid=false + break + fi + done + + if [ "$valid" = true ] && [ ${#selected_folders[@]} -gt 0 ]; then + # Remove duplicates + local unique_folders=() + for folder in "${selected_folders[@]}"; do + local is_duplicate=false + for unique in "${unique_folders[@]}"; do + if [ "$folder" = "$unique" ]; then + is_duplicate=true + break + fi + done + if [ "$is_duplicate" = false ]; then + unique_folders+=("$folder") + fi + done + + # Output folders separated by space (will be captured as array) to stdout + echo "${unique_folders[@]}" + return 0 + else + echo "Invalid selection. Please enter number(s) between 1 and ${#available_folders[@]}." >&2 + echo " Examples: 1 or 1,2 or 1 2 3" >&2 + fi + done +} + +interactive_select_command() { + echo "" >&2 + echo "==========================================" >&2 + echo " Step 2: Select Command" >&2 + echo "==========================================" >&2 + echo "" >&2 + echo "Available commands:" >&2 + echo " [1] enable - Enable development mode (clone internal repo, copy files, update go.mod)" >&2 + echo " [2] disable - Disable development mode (remove copied files and go.mod changes)" >&2 + echo " [3] status - Show current development mode status" >&2 + echo " [4] update - Update internal configs (pull latest from internal repo)" >&2 + echo "" >&2 + + while true; do + read -p "Select command (1-4): " selection + case "$selection" in + 1) + echo "enable" + return 0 + ;; + 2) + echo "disable" + return 0 + ;; + 3) + echo "status" + return 0 + ;; + 4) + echo "update" + return 0 + ;; + *) + echo "Invalid selection. Please enter a number between 1 and 4." >&2 + ;; + esac + done +} + print_usage() { - echo "Usage: $0 [enable|disable|status|update]" + echo "Usage: $0 " + echo "" + echo "Parameters:" + echo " 1. folder-name - Name of the folder to operate on" + echo "" + + # Detect and show available folders + local available_folders=($(get_available_folders)) + if [ ${#available_folders[@]} -gt 0 ]; then + echo " Available folders:" + for folder in "${available_folders[@]}"; do + echo " • $folder" + done + else + echo " Available folders: (none found with go.mod)" + fi + echo "" - echo "Commands:" - echo " enable - Enable development mode (clone internal repo, copy files, update go.mod)" - echo " disable - Disable development mode (remove copied files and go.mod changes)" - echo " status - Show current development mode status" - echo " update - Update internal configs (pull latest from internal repo)" + echo " 2. command - Action to perform" + echo "" + echo " Available commands:" + echo " • enable - Enable development mode (clone internal repo, copy files, update go.mod)" + echo " • disable - Disable development mode (remove copied files and go.mod changes)" + echo " • status - Show current development mode status" + echo " • update - Update internal configs (pull latest from internal repo)" + echo "" + echo "Examples:" + if [ ${#available_folders[@]} -gt 0 ]; then + local first_folder="${available_folders[0]}" + echo " $0 $first_folder enable" + if [ ${#available_folders[@]} -gt 1 ]; then + local second_folder="${available_folders[1]}" + echo " $0 $second_folder status" + fi + echo " $0 $first_folder disable" + echo " $0 $first_folder update" + else + echo " $0 online-feature-store enable" + echo " $0 horizon status" + echo " $0 online-feature-store disable" + fi echo "" exit 1 } @@ -46,9 +194,53 @@ log_debug() { echo -e "${BLUE}[DEBUG]${NC} $1" } +# Validate folder name and set up paths +validate_and_setup() { + FOLDER_NAME="${1:-}" + COMMAND="${2:-}" + + if [ -z "$FOLDER_NAME" ]; then + log_error "Folder name is required" + print_usage + fi + + if [ -z "$COMMAND" ]; then + log_error "Command is required" + print_usage + fi + + # Set up paths based on folder name + TARGET_DIR="$SCRIPT_DIR/$FOLDER_NAME" + STATE_FILE="$TARGET_DIR/.dev-toggle-state" + GO_MOD_FILE="$TARGET_DIR/go.mod" + GO_MOD_APPEND_FILE="$TARGET_DIR/.go.mod.appended" + + INTERNAL_REPO_URL="https://github.com/Meesho/BharatMLStack-internal-configs" + INTERNAL_REPO_DIR="$TARGET_DIR/.internal-configs" + INTERNAL_FOLDER_DIR="$INTERNAL_REPO_DIR/$FOLDER_NAME" + + # Validate that target directory exists + if [ ! -d "$TARGET_DIR" ]; then + log_error "Target directory does not exist: $TARGET_DIR" + exit 1 + fi + + # Validate that go.mod exists in target directory + if [ ! -f "$GO_MOD_FILE" ]; then + log_error "go.mod not found in target directory: $TARGET_DIR" + log_error "This script is designed for Go projects with go.mod files" + exit 1 + fi + + log_debug "Target directory: $TARGET_DIR" + log_debug "State file: $STATE_FILE" + log_debug "Internal repo directory: $INTERNAL_REPO_DIR" + log_debug "Internal folder directory: $INTERNAL_FOLDER_DIR" +} + check_status() { log_info "==========================================" - log_info "Development Mode Status" + log_info "Development Mode Status for: $FOLDER_NAME" log_info "==========================================" if [ -f "$STATE_FILE" ]; then @@ -96,7 +288,7 @@ check_status() { echo "Status: DISABLED" echo "" echo "No active development mode configuration found." - echo "Run './dev-toggle.sh enable' to enable development mode." + echo "Run '$0 $FOLDER_NAME enable' to enable development mode." echo "" return 1 fi @@ -127,7 +319,7 @@ clone_or_update_internal_repo() { log_debug "Found refs/remotes/origin/master" else log_error "Could not determine default branch (main or master not found)" - cd "$SCRIPT_DIR" + cd "$TARGET_DIR" exit 1 fi fi @@ -151,7 +343,7 @@ clone_or_update_internal_repo() { local commit_hash=$(git rev-parse --short HEAD) log_info "Updated to commit: $commit_hash" - cd "$SCRIPT_DIR" + cd "$TARGET_DIR" else log_info "Internal configs repo not found, cloning..." log_debug "Cloning from: $INTERNAL_REPO_URL" @@ -165,50 +357,51 @@ clone_or_update_internal_repo() { enable_dev_mode() { log_info "==========================================" - log_info "Starting development mode enablement" + log_info "Starting development mode enablement for: $FOLDER_NAME" log_info "==========================================" if [ -f "$STATE_FILE" ]; then log_warn "Development mode is already enabled!" - echo "Run '$0 disable' first to revert, or delete $STATE_FILE if state is corrupted." + echo "Run '$0 $FOLDER_NAME disable' first to revert, or delete $STATE_FILE if state is corrupted." exit 1 fi log_debug "State file: $STATE_FILE" - log_debug "Working directory: $SCRIPT_DIR" + log_debug "Working directory: $TARGET_DIR" # Clone or update the internal repo log_info "Step 1: Fetching internal configurations..." clone_or_update_internal_repo - # Check if online-feature-store directory exists in internal repo + # Check if folder directory exists in internal repo log_info "Step 2: Validating internal repository structure..." - log_debug "Looking for: $INTERNAL_OFS_DIR" - if [ ! -d "$INTERNAL_OFS_DIR" ]; then - log_error "online-feature-store directory not found in internal configs repo!" - log_error "Expected path: $INTERNAL_OFS_DIR" + log_debug "Looking for: $INTERNAL_FOLDER_DIR" + if [ ! -d "$INTERNAL_FOLDER_DIR" ]; then + log_error "$FOLDER_NAME directory not found in internal configs repo!" + log_error "Expected path: $INTERNAL_FOLDER_DIR" exit 1 fi - log_debug "✓ online-feature-store directory found" + log_debug "✓ $FOLDER_NAME directory found" # Initialize state file log_info "Step 3: Initializing state tracking..." echo "# Dev toggle state - DO NOT EDIT MANUALLY" > "$STATE_FILE" echo "# Generated on $(date)" >> "$STATE_FILE" + echo "# Folder: $FOLDER_NAME" >> "$STATE_FILE" log_debug "Created state file: $STATE_FILE" # Find and copy all .go files from internal repo log_info "Step 4: Searching for .go files to copy..." - log_debug "Scanning directory: $INTERNAL_OFS_DIR" + log_debug "Scanning directory: $INTERNAL_FOLDER_DIR" local copied_count=0 - local file_count=$(find "$INTERNAL_OFS_DIR" -name "*.go" -type f | wc -l) + local file_count=$(find "$INTERNAL_FOLDER_DIR" -name "*.go" -type f | wc -l) log_info "Found $file_count .go file(s) to copy" while IFS= read -r -d '' src_file; do - # Get relative path from INTERNAL_OFS_DIR - rel_path="${src_file#$INTERNAL_OFS_DIR/}" - dest_file="$SCRIPT_DIR/$rel_path" + # Get relative path from INTERNAL_FOLDER_DIR + rel_path="${src_file#$INTERNAL_FOLDER_DIR/}" + dest_file="$TARGET_DIR/$rel_path" dest_dir="$(dirname "$dest_file")" # Create destination directory if it doesn't exist @@ -230,17 +423,17 @@ enable_dev_mode() { # Record in state file echo "FILE:$rel_path" >> "$STATE_FILE" ((copied_count++)) - done < <(find "$INTERNAL_OFS_DIR" -name "*.go" -type f -print0) + done < <(find "$INTERNAL_FOLDER_DIR" -name "*.go" -type f -print0) log_info "Successfully copied $copied_count file(s)" # Check if there's a go.mod file in internal repo with replace directives log_info "Step 5: Processing go.mod modifications..." - if [ -f "$INTERNAL_OFS_DIR/go.mod" ]; then - log_debug "Found go.mod in internal configs: $INTERNAL_OFS_DIR/go.mod" + if [ -f "$INTERNAL_FOLDER_DIR/go.mod" ]; then + log_debug "Found go.mod in internal configs: $INTERNAL_FOLDER_DIR/go.mod" # Extract lines that start with "replace " from internal go.mod - if grep -E "^replace " "$INTERNAL_OFS_DIR/go.mod" > "$GO_MOD_APPEND_FILE"; then + if grep -E "^replace " "$INTERNAL_FOLDER_DIR/go.mod" > "$GO_MOD_APPEND_FILE"; then local replace_count=$(wc -l < "$GO_MOD_APPEND_FILE") log_info "Found $replace_count replace directive(s) to append" log_debug "Replace directives:" @@ -251,7 +444,7 @@ enable_dev_mode() { # Append to current go.mod log_info "Appending to $GO_MOD_FILE..." echo "" >> "$GO_MOD_FILE" - echo "// Added by dev-toggle.sh - DO NOT EDIT" >> "$GO_MOD_FILE" + echo "// Added by dev-toggle-go.sh - DO NOT EDIT" >> "$GO_MOD_FILE" cat "$GO_MOD_APPEND_FILE" >> "$GO_MOD_FILE" log_info "✓ Successfully appended replace directives to go.mod" @@ -260,13 +453,13 @@ enable_dev_mode() { rm -f "$GO_MOD_APPEND_FILE" fi else - log_warn "No go.mod found in internal configs at: $INTERNAL_OFS_DIR/go.mod" + log_warn "No go.mod found in internal configs at: $INTERNAL_FOLDER_DIR/go.mod" fi # Run go mod tidy log_info "Step 6: Running go mod tidy..." - log_debug "Changing to directory: $SCRIPT_DIR" - cd "$SCRIPT_DIR" + log_debug "Changing to directory: $TARGET_DIR" + cd "$TARGET_DIR" log_debug "Executing: go mod tidy" go mod tidy log_info "✓ go mod tidy completed successfully" @@ -276,6 +469,7 @@ enable_dev_mode() { log_info "==========================================" echo "" echo "Summary:" + echo " Folder: $FOLDER_NAME" echo " Files copied: $copied_count" echo " go.mod updated: $([ -f "$GO_MOD_APPEND_FILE" ] && echo "YES" || echo "NO")" echo " State file: $STATE_FILE" @@ -283,7 +477,7 @@ enable_dev_mode() { disable_dev_mode() { log_info "==========================================" - log_info "Starting development mode disablement" + log_info "Starting development mode disablement for: $FOLDER_NAME" log_info "==========================================" if [ ! -f "$STATE_FILE" ]; then @@ -294,7 +488,7 @@ disable_dev_mode() { fi log_debug "State file found: $STATE_FILE" - log_debug "Working directory: $SCRIPT_DIR" + log_debug "Working directory: $TARGET_DIR" # Remove copied files log_info "Step 1: Removing copied files..." @@ -305,7 +499,7 @@ disable_dev_mode() { while IFS= read -r line; do if [[ "$line" =~ ^FILE:(.+)$ ]]; then rel_path="${BASH_REMATCH[1]}" - file="$SCRIPT_DIR/$rel_path" + file="$TARGET_DIR/$rel_path" log_debug "Attempting to remove: $file" if [ -f "$file" ]; then @@ -326,13 +520,13 @@ disable_dev_mode() { log_debug "Found append file: $GO_MOD_APPEND_FILE" # Find the marker line and remove everything from that line onwards - if grep -q "// Added by dev-toggle.sh - DO NOT EDIT" "$GO_MOD_FILE"; then + if grep -q "// Added by dev-toggle-go.sh - DO NOT EDIT" "$GO_MOD_FILE"; then log_debug "Found marker comment in go.mod" local lines_before=$(wc -l < "$GO_MOD_FILE") # Use sed to delete from the marker line to end of file log_debug "Removing lines from marker to end of file..." - sed -i.bak '/\/\/ Added by dev-toggle.sh - DO NOT EDIT/,$d' "$GO_MOD_FILE" + sed -i.bak '/\/\/ Added by dev-toggle-go.sh - DO NOT EDIT/,$d' "$GO_MOD_FILE" rm -f "${GO_MOD_FILE}.bak" # Remove any trailing empty lines that might be left @@ -367,8 +561,8 @@ disable_dev_mode() { # Run go mod tidy log_info "Step 3: Running go mod tidy..." - log_debug "Changing to directory: $SCRIPT_DIR" - cd "$SCRIPT_DIR" + log_debug "Changing to directory: $TARGET_DIR" + cd "$TARGET_DIR" log_debug "Executing: go mod tidy" go mod tidy log_info "✓ go mod tidy completed successfully" @@ -396,13 +590,13 @@ disable_dev_mode() { update_internal_configs() { log_info "==========================================" - log_info "Updating internal configurations" + log_info "Updating internal configurations for: $FOLDER_NAME" log_info "==========================================" if [ ! -f "$STATE_FILE" ]; then log_error "Development mode is not enabled!" log_error "Expected state file: $STATE_FILE" - echo "Run '$0 enable' first." + echo "Run '$0 $FOLDER_NAME enable' first." exit 1 fi @@ -426,30 +620,98 @@ update_internal_configs() { } # Main script logic -log_debug "Script started with command: ${1:-}" -log_debug "Script directory: $SCRIPT_DIR" -log_debug "Internal repo URL: $INTERNAL_REPO_URL" -echo "" - -case "${1:-}" in - enable) - log_debug "Command: enable" - enable_dev_mode - ;; - disable) - log_debug "Command: disable" - disable_dev_mode - ;; - status) - log_debug "Command: status" - check_status - ;; - update) - log_debug "Command: update" - update_internal_configs - ;; - *) - log_error "Invalid or missing command" - print_usage - ;; -esac \ No newline at end of file +FOLDER_NAME_INPUT="${1:-}" +COMMAND="${2:-}" + +# Interactive mode: prompt for missing parameters +if [ -z "$FOLDER_NAME_INPUT" ]; then + echo "==========================================" + echo " Development Toggle for Go Projects" + echo "==========================================" + FOLDER_NAME_INPUT=$(interactive_select_folder) +fi + +if [ -z "$COMMAND" ]; then + COMMAND=$(interactive_select_command) +fi + +# Parse folder names (support multiple folders) +FOLDER_NAMES=($FOLDER_NAME_INPUT) + +# If only one folder, process it directly +if [ ${#FOLDER_NAMES[@]} -eq 1 ]; then + FOLDER_NAME="${FOLDER_NAMES[0]}" + + # Validate and set up paths + validate_and_setup "$FOLDER_NAME" "$COMMAND" + + log_debug "Script started with folder: $FOLDER_NAME, command: $COMMAND" + log_debug "Script directory: $SCRIPT_DIR" + log_debug "Target directory: $TARGET_DIR" + echo "" + + case "$COMMAND" in + enable) + log_debug "Command: enable" + enable_dev_mode + ;; + disable) + log_debug "Command: disable" + disable_dev_mode + ;; + status) + log_debug "Command: status" + check_status + ;; + update) + log_debug "Command: update" + update_internal_configs + ;; + *) + log_error "Invalid command: $COMMAND" + print_usage + ;; + esac +else + # Multiple folders selected - process each one + echo "" + log_info "Processing ${#FOLDER_NAMES[@]} folder(s) with command: $COMMAND" + echo "" + + for FOLDER_NAME in "${FOLDER_NAMES[@]}"; do + echo "" + log_info "==========================================" + log_info "Processing: $FOLDER_NAME" + log_info "==========================================" + echo "" + + # Validate and set up paths for this folder + validate_and_setup "$FOLDER_NAME" "$COMMAND" + + case "$COMMAND" in + enable) + enable_dev_mode + ;; + disable) + disable_dev_mode + ;; + status) + check_status + ;; + update) + update_internal_configs + ;; + *) + log_error "Invalid command: $COMMAND" + print_usage + exit 1 + ;; + esac + done + + echo "" + log_info "==========================================" + log_info "✓ Completed processing all ${#FOLDER_NAMES[@]} folder(s)" + log_info "==========================================" +fi + diff --git a/horizon/cmd/horizon/main.go b/horizon/cmd/horizon/main.go index b7f3f3f9..291f5d13 100644 --- a/horizon/cmd/horizon/main.go +++ b/horizon/cmd/horizon/main.go @@ -9,7 +9,6 @@ import ( "github.com/Meesho/BharatMLStack/horizon/internal/configs" connectionConfigRouter "github.com/Meesho/BharatMLStack/horizon/internal/connectionconfig/route" deployableRouter "github.com/Meesho/BharatMLStack/horizon/internal/deployable/router" - featureStoreRouter "github.com/Meesho/BharatMLStack/horizon/internal/feature_store/route" inferflowConfig "github.com/Meesho/BharatMLStack/horizon/internal/inferflow/etcd" inferflowRouter "github.com/Meesho/BharatMLStack/horizon/internal/inferflow/route" "github.com/Meesho/BharatMLStack/horizon/internal/middleware" @@ -24,9 +23,6 @@ import ( "github.com/Meesho/BharatMLStack/horizon/pkg/logger" "github.com/Meesho/BharatMLStack/horizon/pkg/metric" "github.com/Meesho/BharatMLStack/horizon/pkg/scheduler" - cacConfig "github.com/Meesho/go-core/config" - pricingclient "github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client" - "github.com/spf13/viper" ) type AppConfig struct { @@ -47,11 +43,11 @@ var ( ) func main() { - cacConfig.InitGlobalConfig(&appConfig) + configs.InitConfig(&appConfig) horizonConfig.InitAll(appConfig.Configs) - infra.InitDBConnectors() - logger.Init() - metric.Init() + infra.InitDBConnectors(appConfig.Configs) + logger.Init(appConfig.Configs) + metric.Init(appConfig.Configs) httpframework.Init(middleware.NewMiddleware().GetMiddleWares()...) etcd.InitFromAppName(&ofsConfig.FeatureRegistry{}, appConfig.Configs.OnlineFeatureStoreAppName, appConfig.Configs) etcd.InitFromAppName(&numerixConfig.NumerixConfigRegistery{}, appConfig.Configs.NumerixAppName, appConfig.Configs) @@ -65,8 +61,6 @@ func main() { predatorRouter.Init() authRouter.Init() ofsRouter.Init() - featureStoreRouter.Init(appConfig.Configs) scheduler.Init(appConfig.Configs) - pricingclient.Init() - httpframework.Instance().Run(":" + strconv.Itoa(viper.GetInt("APP_PORT"))) + httpframework.Instance().Run(":" + strconv.Itoa(appConfig.Configs.AppPort)) } diff --git a/horizon/env.example b/horizon/env.example index 27f06d64..90c0d331 100644 --- a/horizon/env.example +++ b/horizon/env.example @@ -1,3 +1,4 @@ +MEESHO_ENABLED=false APP_NAME=horizon APP_PORT=8082 APP_LOG_LEVEL=INFO diff --git a/horizon/go.mod b/horizon/go.mod index ba64a6ed..f19428fd 100644 --- a/horizon/go.mod +++ b/horizon/go.mod @@ -7,17 +7,13 @@ toolchain go1.24.10 require ( cloud.google.com/go/storage v1.57.2 github.com/DataDog/datadog-go/v5 v5.6.0 - github.com/Meesho/go-core v1.30.20 github.com/Meesho/helix-clients v0.8.1 - github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client v0.0.12 github.com/deckarep/golang-set/v2 v2.8.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/gin-contrib/cors v1.7.3 github.com/gin-gonic/gin v1.10.1 - github.com/go-zookeeper/zk v1.0.4 github.com/gocql/gocql v1.7.0 github.com/google/uuid v1.6.0 - github.com/klauspost/compress v1.18.0 github.com/robfig/cron/v3 v3.0.1 github.com/rs/zerolog v1.34.0 github.com/spf13/viper v1.20.1 @@ -43,8 +39,8 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect + github.com/Meesho/go-core v1.30.20 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/bits-and-blooms/bitset v1.22.0 // indirect github.com/bytedance/sonic v1.13.3 // indirect github.com/bytedance/sonic/loader v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -55,7 +51,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect - github.com/failsafe-go/failsafe-go v0.6.9 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect diff --git a/horizon/go.sum b/horizon/go.sum index b7b01bca..23dd92e3 100644 --- a/horizon/go.sum +++ b/horizon/go.sum @@ -34,8 +34,6 @@ github.com/Meesho/go-core v1.30.20 h1:ijkKm9objd09ZMXGgLMVZpj8Q4+0rvMFY/2hSFFIoM github.com/Meesho/go-core v1.30.20/go.mod h1:Ftn5QRPrCwy/c/m0Mp8zoX0dWcW4PZvtPwyi/qFL6lc= github.com/Meesho/helix-clients v0.8.1 h1:oCmxw62WyXjqMcLacbzU41NQkMyeYGF1V1UQUrAszK8= github.com/Meesho/helix-clients v0.8.1/go.mod h1:S1mVOB7X3Mp6TkoLHPkxba5892WQiA794oVw339yiRE= -github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client v0.0.12 h1:PmYPWRThLACd5mOGAA2O5qBc7/Ygb4IszXq9TJpdgYw= -github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client v0.0.12/go.mod h1:6fvt8B1VeZElMKNotGs11qkdNjWQoTzJrxPVw4g1pI4= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= @@ -43,8 +41,6 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= -github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= -github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bytedance/sonic v1.13.3 h1:MS8gmaH16Gtirygw7jV91pDCN33NyMrPbN7qiYhEsF0= @@ -52,8 +48,6 @@ github.com/bytedance/sonic v1.13.3/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1 github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= -github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= -github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= @@ -81,8 +75,6 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4= github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= -github.com/failsafe-go/failsafe-go v0.6.9 h1:7HWEzOlFOjNerxgWd8onWA2j/aEuqyAtuX6uWya/364= -github.com/failsafe-go/failsafe-go v0.6.9/go.mod h1:zb7xfp1/DJ7Mn4xJhVSZ9F2qmmMEGvYHxEOHYK5SIm0= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -116,8 +108,6 @@ github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= -github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I= -github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= @@ -144,9 +134,6 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.7 h1:zrn2Ee/nWmHulBx5sAV github.com/googleapis/enterprise-certificate-proxy v0.3.7/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= -github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -157,8 +144,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= @@ -258,10 +243,6 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/X go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:NfchwuyNoMcZ5MLHwPrODwUF1HWCXWrL31s8gSAdIKY= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 h1:EtFWSnwW9hGObjkIdmlnWSydO+Qs8OwzfzXLUPg4xOc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0/go.mod h1:QjUEoiGCPkvFZ/MjK6ZZfNOS6mfVEVKYE99dFhuN2LI= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 h1:rixTyDGXFxRy1xzhKrotaHy3/KXdPhlWARrCgK+eqUY= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0/go.mod h1:dowW6UsM9MKbJq5JTz2AMVp3/5iW5I/TStsk8S+CfHw= go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= @@ -272,8 +253,6 @@ go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFh go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= -go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os= -go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/horizon/internal/configs/app_config.go b/horizon/internal/configs/app_config.go index 122a4073..7a6ce01b 100644 --- a/horizon/internal/configs/app_config.go +++ b/horizon/internal/configs/app_config.go @@ -103,7 +103,11 @@ type Configs struct { PricingFeatureRetrievalGrpcPlainText bool `mapstructure:"pricing_feature_retrieval_grpc_plain_text"` PricingFeatureRetrievalTimeoutMs string `mapstructure:"pricing_feature_retrieval_timeout_in_ms"` - OnlineFeatureStoreAppName string `mapstructure:"online_feature_store_app_name"` + OnlineFeatureStoreAppName string `mapstructure:"online_feature_store_app_name"` + ScyllaActiveConfIds string `mapstructure:"scylla_active_conf_ids"` + RedisFailoverActiveConfIds string `mapstructure:"redis_failover_active_conf_ids"` + + IsMeeshoEnabled bool `mapstructure:"is_meesho_enabled"` } type DynamicConfigs struct{} diff --git a/horizon/internal/configs/config_init_stub.go b/horizon/internal/configs/config_init_stub.go new file mode 100644 index 00000000..e6bedad7 --- /dev/null +++ b/horizon/internal/configs/config_init_stub.go @@ -0,0 +1,35 @@ +//go:build !meesho + +package configs + +import ( + "log" + + "github.com/Meesho/BharatMLStack/horizon/pkg/config" + "github.com/spf13/viper" +) + +// ConfigHolder interface for app config +type ConfigHolder interface { + GetStaticConfig() interface{} + GetDynamicConfig() interface{} +} + +// InitConfig initializes configuration based on MEESHO_ENABLED flag +func InitConfig(configHolder ConfigHolder) { + config.InitEnv() + + staticConfig := configHolder.GetStaticConfig() + cfg, ok := staticConfig.(*Configs) + if !ok { + log.Fatal("Failed to cast static config to *Configs") + } + + // Bind environment variables to config keys + // This maps APP_NAME (env) -> app_name (config key) + if err := viper.Unmarshal(cfg); err != nil { + log.Fatalf("Failed to unmarshal config from environment: %v", err) + } + + log.Println("Configuration loaded from environment variables") +} diff --git a/horizon/internal/externalcall/init.go b/horizon/internal/externalcall/init.go index 99275dc0..e2e88301 100644 --- a/horizon/internal/externalcall/init.go +++ b/horizon/internal/externalcall/init.go @@ -10,5 +10,6 @@ func Init(config configs.Configs) { InitRingmasterClient(config.RingmasterBaseUrl, config.RingmasterMiscSession, config.RingmasterAuthorization, config.RingmasterEnvironment, config.RingmasterApiKey) // Initialize feature validation client with config-based URLs InitFeatureValidationClient(config) - // Pricing client is initialized in main.go + // Initialize pricing client - provides both raw data types and RTP format + PricingClient.InitPricingClient() } diff --git a/horizon/internal/externalcall/pricing_client.go b/horizon/internal/externalcall/pricing_client.go deleted file mode 100644 index 0e37bc4b..00000000 --- a/horizon/internal/externalcall/pricing_client.go +++ /dev/null @@ -1,153 +0,0 @@ -package externalcall - -import ( - "context" - "fmt" - "strings" - - pricingclient "github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client" - "github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client/models" - "github.com/rs/zerolog/log" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// PricingFeatureClient interface for pricing feature validation -type PricingFeatureClient interface { - GetDataTypes(entity string) (*PricingDataTypesResponse, error) -} - -// PricingDataTypesResponse represents the response from pricing feature service -type PricingDataTypesResponse struct { - Entities []struct { - Entity string `json:"entity"` - FeatureGroups []struct { - Label string `json:"label"` - Features []string `json:"features"` - DataType string `json:"dataType"` - } `json:"featureGroups"` - } `json:"entities"` -} - -type pricingFeatureClientImpl struct{} - -var PricingClient PricingFeatureClient = &pricingFeatureClientImpl{} - -// GetDataTypes calls the pricing service to get data types for features -func (p *pricingFeatureClientImpl) GetDataTypes(entity string) (*PricingDataTypesResponse, error) { - log.Info().Msgf("Calling pricing service for entity: %s", entity) - - // Create request for the pricing client (no entity parameter needed based on your requirement) - clientInstance := pricingclient.Instance(1) - - // Call the real pricing client - pricingResponse, err := clientInstance.GetDataTypes(context.Background(), &models.GetDataTypesRequest{}, map[string]string{}) - if err != nil { - log.Error().Err(err).Msgf("Failed to call pricing service for entity: %s", entity) - - // Check if it's a gRPC connection error and provide fallback - if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable { - log.Warn().Msgf("Pricing service unavailable for entity: %s, returning fallback response", entity) - return p.getFallbackResponse(entity), nil - } - - return nil, fmt.Errorf("failed to call pricing service: %w", err) - } - - // Convert the pricing client response to our response format - response := &PricingDataTypesResponse{ - Entities: make([]struct { - Entity string `json:"entity"` - FeatureGroups []struct { - Label string `json:"label"` - Features []string `json:"features"` - DataType string `json:"dataType"` - } `json:"featureGroups"` - }, len(pricingResponse.Entities)), - } - - // Map the response from pricing client to our format - for i, pricingEntity := range pricingResponse.Entities { - response.Entities[i].Entity = pricingEntity.Entity - response.Entities[i].FeatureGroups = make([]struct { - Label string `json:"label"` - Features []string `json:"features"` - DataType string `json:"dataType"` - }, len(pricingEntity.FeatureGroups)) - - for j, pricingFeatureGroup := range pricingEntity.FeatureGroups { - response.Entities[i].FeatureGroups[j].Label = pricingFeatureGroup.Label - response.Entities[i].FeatureGroups[j].Features = pricingFeatureGroup.Features - response.Entities[i].FeatureGroups[j].DataType = pricingFeatureGroup.DataType - } - } - - log.Info().Msgf("Successfully retrieved pricing data for entity %s with %d entities", entity, len(response.Entities)) - return response, nil -} - -// ValidatePricingFeatureExists checks if a pricing feature exists in the response -func ValidatePricingFeatureExists(featureName string, response *PricingDataTypesResponse) bool { - if response == nil { - return false - } - - // Parse feature name: entity:feature_group:feature - featureParts := strings.Split(featureName, ":") - if len(featureParts) != 3 { - return false - } - - expectedEntity := featureParts[0] // entity (e.g., "user_product") - expectedFG := featureParts[1] // feature group (e.g., "real_time_product_pricing") - expectedFeature := featureParts[2] // feature name (e.g., "rto_charge") - - // Search through entities and feature groups - for _, entity := range response.Entities { - // Check if this is the correct entity - if entity.Entity == expectedEntity { - for _, featureGroup := range entity.FeatureGroups { - // Check if this is the correct feature group - if featureGroup.Label == expectedFG { - // Check if feature exists in this feature group - for _, f := range featureGroup.Features { - if f == expectedFeature { - return true - } - } - } - } - } - } - - return false -} - -// getFallbackResponse provides a fallback response when pricing service is unavailable -func (p *pricingFeatureClientImpl) getFallbackResponse(entity string) *PricingDataTypesResponse { - return &PricingDataTypesResponse{ - Entities: []struct { - Entity string `json:"entity"` - FeatureGroups []struct { - Label string `json:"label"` - Features []string `json:"features"` - DataType string `json:"dataType"` - } `json:"featureGroups"` - }{ - { - Entity: entity, - FeatureGroups: []struct { - Label string `json:"label"` - Features []string `json:"features"` - DataType string `json:"dataType"` - }{ - { - Label: "fallback_feature_group", - Features: []string{"fallback_feature"}, - DataType: "DataTypeString", - }, - }, - }, - }, - } -} diff --git a/horizon/internal/externalcall/pricing_client_stub.go b/horizon/internal/externalcall/pricing_client_stub.go new file mode 100644 index 00000000..848b489b --- /dev/null +++ b/horizon/internal/externalcall/pricing_client_stub.go @@ -0,0 +1,58 @@ +//go:build !meesho + +package externalcall + +import ( + "errors" + + "github.com/rs/zerolog/log" +) + +// PricingFeatureClient interface for pricing feature validation +type PricingFeatureClient interface { + GetDataTypes(entity string) (*PricingDataTypesResponse, error) + GetFeatureGroupDataTypeMap() (map[string]string, error) + InitPricingClient() +} + +const ( + RTPColonDelimiter = ":" + RTPUnderscoreDelimiter = "_" +) + +// PricingDataTypesResponse represents the response from pricing feature service +type PricingDataTypesResponse struct { + Entities []struct { + Entity string `json:"entity"` + FeatureGroups []struct { + Label string `json:"label"` + Features []string `json:"features"` + DataType string `json:"dataType"` + } `json:"featureGroups"` + } `json:"entities"` +} + +type pricingFeatureClientImpl struct{} + +var PricingClient PricingFeatureClient = &pricingFeatureClientImpl{} + +// GetDataTypes calls the pricing service to get data types for features +func (p *pricingFeatureClientImpl) GetDataTypes(entity string) (*PricingDataTypesResponse, error) { + return nil, errors.New("Pricing client GetDataTypes is not supported without meesho build tag") +} + +// GetFeatureGroupDataTypeMap returns an error for stub implementation +func (p *pricingFeatureClientImpl) GetFeatureGroupDataTypeMap() (map[string]string, error) { + log.Warn().Msg("Pricing client GetFeatureGroupDataTypeMap is not supported without meesho build tag") + return nil, errors.New("Pricing client GetFeatureGroupDataTypeMap is not supported without meesho build tag") +} + +// ValidatePricingFeatureExists checks if a pricing feature exists in the response +func ValidatePricingFeatureExists(featureName string, response *PricingDataTypesResponse) bool { + log.Warn().Msgf("Pricing client ValidatePricingFeatureExists is not supported without meesho build tag") + return false +} + +func (p *pricingFeatureClientImpl) InitPricingClient() { + log.Warn().Msgf("Pricing client Init is not supported without meesho build tag") +} diff --git a/horizon/internal/feature_store/controller/fs_config.go b/horizon/internal/feature_store/controller/fs_config.go deleted file mode 100644 index 71e51646..00000000 --- a/horizon/internal/feature_store/controller/fs_config.go +++ /dev/null @@ -1,31 +0,0 @@ -package controller - -import ( - feature_registry "github.com/Meesho/BharatMLStack/horizon/internal/feature_store/handler" - "github.com/gin-gonic/gin" - "github.com/rs/zerolog/log" -) - -type FsConfigController interface { - GetFsConfigs(ctx *gin.Context) -} - -type FsConfigControllerV1 struct { - FsConfigHandler feature_registry.FsConfigHandler -} - -func NewFsConfigControllerGenerator(handler feature_registry.FsConfigHandler) *FsConfigControllerV1 { - if handler == nil { - log.Panic().Msg("application config handler cannot be nil") - } - return &FsConfigControllerV1{ - FsConfigHandler: handler, - } -} - -func (f *FsConfigControllerV1) GetFsConfigs(ctx *gin.Context) { - if apiError := f.FsConfigHandler.GetFsConfigs(ctx); apiError != nil { - ctx.Error(apiError) - return - } -} diff --git a/horizon/internal/feature_store/handler/fs_config.go b/horizon/internal/feature_store/handler/fs_config.go deleted file mode 100644 index ed818585..00000000 --- a/horizon/internal/feature_store/handler/fs_config.go +++ /dev/null @@ -1,121 +0,0 @@ -package handler - -import ( - "encoding/json" - "fmt" - "sync" - - "github.com/Meesho/BharatMLStack/horizon/internal/configs" - "github.com/Meesho/BharatMLStack/horizon/pkg/api" - "github.com/Meesho/BharatMLStack/horizon/pkg/zookeeper" - "github.com/gin-gonic/gin" - "github.com/klauspost/compress/zstd" - "github.com/rs/zerolog/log" -) - -type FsConfigHandler interface { - GetFsConfigs(ctx *gin.Context) *api.Error -} - -type FsConfig struct { - fsConfigMap map[string][]byte - zkConfig *Conf -} - -var Mutex sync.Mutex - -func NewFsConfigHandlerGenerator() *FsConfig { - return &FsConfig{ - fsConfigMap: make(map[string][]byte), - zkConfig: &Conf{}, - } -} -func (f *FsConfig) GetFsConfigs(ctx *gin.Context) *api.Error { - data, ok := f.fsConfigMap["fs_config"] - if !ok { - return api.NewInternalServerError("Failed to retrieve configs") - } - response := &FsConfigResponse{ - Data: data, - } - decoder, err := zstd.NewReader(nil) - if err != nil { - log.Error().Err(err).Msg("Failed to create zstd decoder") - - } - decompressedBytes, err := decoder.DecodeAll(data, nil) - if err != nil { - log.Error().Err(err).Msg("Failed to decompress bytes") - - } - - zkconfig1 := Conf{} - // Unmarshal the decompressed bytes into zkConfig - err = json.Unmarshal(decompressedBytes, &zkconfig1) - ctx.JSON(200, response) - return nil -} - -func (f *FsConfig) InitFsConfig(config configs.Configs) { - zookeeper.InitZKConnection(config) - zookeeper.WatchZkNode() - go f.ListenZk() - decoder, err := zstd.NewReader(nil) - if err != nil { - log.Error().Err(err).Msg("Failed to create zstd decoder") - return - } - decompressedBytes, err := decoder.DecodeAll(f.fsConfigMap["fs_config"], nil) - if err != nil { - log.Error().Err(err).Msg("Failed to decompress bytes") - return - } - - zkconfig1 := Conf{} - // Unmarshal the decompressed bytes into zkConfig - err = json.Unmarshal(decompressedBytes, &zkconfig1) - - log.Info().Msg("Initializing FS Config Handler") -} - -func (f *FsConfig) ListenZk() { - for { - select { - case data := <-zookeeper.ZKChannel: - var localZkConf *Conf - var compressedBytes []byte - err := json.Unmarshal(data, &localZkConf) - if err != nil { - log.Error().Err(err).Msg("Failed to unmarshal JSON") - } else { - Mutex.Lock() - f.zkConfig = localZkConf - - // Convert ZkConf to byte array - zkConfBytes, err := json.Marshal(f.zkConfig) - if err != nil { - log.Error().Err(err).Msg("Failed to marshal ZkConf") - } else { - // Print size before compression - fmt.Printf("Size before compression: %d bytes\n", len(zkConfBytes)) - - // Apply zstd compression - encoder, err := zstd.NewWriter(nil) - if err != nil { - log.Error().Err(err).Msg("Failed to create zstd encoder") - } - compressedBytes = encoder.EncodeAll(zkConfBytes, nil) - - // Print size after compression - fmt.Printf("Size after compression: %d bytes\n", len(compressedBytes)) - } - Mutex.Unlock() - - } - - if len(compressedBytes) != 0 { - f.fsConfigMap["fs_config"] = compressedBytes - } - } - } -} diff --git a/horizon/internal/feature_store/handler/model.go b/horizon/internal/feature_store/handler/model.go deleted file mode 100644 index a710277a..00000000 --- a/horizon/internal/feature_store/handler/model.go +++ /dev/null @@ -1,75 +0,0 @@ -package handler - -type Conf struct { - EntityConf map[string]EntityConf `json:"entities"` - Storage Storage `json:"storage"` - InMemory map[string]InMemoryConf `json:"in-memory"` - McacheEnabled ExperimentControl `json:"mcache-enabled"` //for all clusters - ErrorLoggingPercentage string `json:"error-logging-percentage"` -} - -type InMemoryConf struct { - FeatureGroupConf map[string]string `json:"feature-groups"` -} - -type EntityConf struct { - Label string `json:"label"` - Keys map[int]Keys `json:"keys"` - FeatureGroupConf map[string]FeatureGroupConf `json:"feature-groups"` - InMemoryEnabled string `json:"in-memory-enabled"` - DistributedCacheEnabled string `json:"distributed-cache-enabled"` - RedisCacheTTL string `json:"redis-cache-ttl"` - DistributeCacheInfraVersion string `json:"distributed-cache-infra-version"` - DistributedCacheWritePercentage string `json:"distributed-cache-write-percentage"` -} - -type Keys struct { - Sequence string `json:"sequence"` - EntityLabel string `json:"entity-label"` - ColumnLabel string `json:"column-label"` -} - -type FeatureGroupConf struct { - FeatureColumnConf map[string]FeatureColumnConf `json:"columns"` - StoreId string `json:"storeId"` - Label string `json:"label"` - DataType string `json:"data-type"` - Ttl string `json:"ttl"` - JobId string `json:"jobId"` - MaxSizeByte string `json:"max-size-byte"` -} - -type FeatureColumnConf struct { - ColumnLabel string `json:"column-label"` - ActiveConfig string `json:"active-config"` - CurrentSizeByte string `json:"current-size-byte"` - FeatureConf map[string]FeatureConf `json:"config"` -} - -type FeatureConf struct { - FeatureLabels string `json:"feature-labels"` - FeatureDefaultValues string `json:"feature-default-values"` -} - -type Storage struct { - RateLimiter RateLimiter `json:"rate-limiter"` - RedisCacheTTL string `json:"redis-cache-ttl"` - Stores map[string]string `json:"stores"` - DefaultScyllaPercentage string `json:"default-scylla-percentage"` -} - -type RateLimiter struct { - PermitsPerSecond string `json:"permits-per-second"` - WarmupPeriod string `json:"warmup-period"` -} - -type ExperimentControl struct { - Enabled string `json:"enabled"` - PercentageEnabled string `json:"percentage-enabled"` - Deadline string `json:"deadline"` - JitterPercentage string `json:"jitter-percentage"` -} - -type FsConfigResponse struct { - Data []byte `json:"data"` -} diff --git a/horizon/internal/feature_store/route/router.go b/horizon/internal/feature_store/route/router.go deleted file mode 100644 index a99e0446..00000000 --- a/horizon/internal/feature_store/route/router.go +++ /dev/null @@ -1,28 +0,0 @@ -package route - -import ( - "sync" - - "github.com/Meesho/BharatMLStack/horizon/internal/configs" - "github.com/Meesho/BharatMLStack/horizon/internal/feature_store/controller" - feature_registry "github.com/Meesho/BharatMLStack/horizon/internal/feature_store/handler" - "github.com/Meesho/BharatMLStack/horizon/pkg/httpframework" -) - -var initFeatureStoreRouterOnce sync.Once - -func Init(config configs.Configs) { - initFeatureStoreRouterOnce.Do(func() { - api := httpframework.Instance().Group("/api") - { - v1 := api.Group("/1.0") - - fsConfigHandler := feature_registry.NewFsConfigHandlerGenerator() - fsConfigHandler.InitFsConfig(config) - fsConfig := v1.Group("/fs-config") - { - fsConfig.GET("/fetch", controller.NewFsConfigControllerGenerator(fsConfigHandler).GetFsConfigs) - } - } - }) -} diff --git a/horizon/internal/inferflow/etcd/models.go b/horizon/internal/inferflow/etcd/models.go index 44921adf..b72d96fb 100644 --- a/horizon/internal/inferflow/etcd/models.go +++ b/horizon/internal/inferflow/etcd/models.go @@ -43,6 +43,12 @@ type PredatorOutput struct { DataType string `json:"data_type"` } +type RoutingConfig struct { + ModelName string `json:"model_name"` + ModelEndpoint string `json:"model_endpoint"` + RoutingPercentage float32 `json:"routing_percentage"` +} + type PredatorComponent struct { Component string `json:"component"` ComponentID string `json:"component_id"` @@ -53,6 +59,7 @@ type PredatorComponent struct { BatchSize int `json:"batch_size"` Inputs []PredatorInput `json:"inputs"` Outputs []PredatorOutput `json:"outputs"` + RoutingConfig []RoutingConfig `json:"route_config,omitempty"` } type RTPComponent struct { @@ -107,7 +114,7 @@ type ComponentConfig struct { CacheTTL int `json:"cache_ttl"` CacheVersion int `json:"cache_version"` FeatureComponents []FeatureComponent `json:"feature_components"` - RTPComponents []RTPComponent `json:"real_time_pricing_feature_components"` + RTPComponents []RTPComponent `json:"real_time_pricing_feature_components,omitempty"` PredatorComponents []PredatorComponent `json:"predator_components"` NumerixComponents []NumerixComponent `json:"numerix_components"` } diff --git a/horizon/internal/inferflow/handler/adaptor.go b/horizon/internal/inferflow/handler/adaptor.go index 6f264ee5..52665062 100644 --- a/horizon/internal/inferflow/handler/adaptor.go +++ b/horizon/internal/inferflow/handler/adaptor.go @@ -185,6 +185,21 @@ func AdaptToDBPredatorComponent(inferflowConfig InferflowConfig) []dbModel.Preda } } + routingConfig := make([]dbModel.RoutingConfig, len(ranker.RoutingConfig)) + + for i, config := range ranker.RoutingConfig { + + routingConfig[i] = dbModel.RoutingConfig{ + + ModelName: config.ModelName, + + ModelEndpoint: config.ModelEndpoint, + + RoutingPercentage: config.RoutingPercentage, + } + + } + predatorComp := dbModel.PredatorComponent{ Component: ranker.Component, ComponentID: ranker.ComponentID, @@ -195,6 +210,7 @@ func AdaptToDBPredatorComponent(inferflowConfig InferflowConfig) []dbModel.Preda BatchSize: ranker.BatchSize, Inputs: dbInputs, Outputs: dbOutputs, + RoutingConfig: routingConfig, } predatorComponents = append(predatorComponents, predatorComp) } @@ -349,12 +365,15 @@ func AdaptToDBOnboardPayload(onboardPayload OnboardPayload) dbModel.OnboardPaylo for i, ranker := range onboardPayload.Rankers { dbOnboardPayload.Rankers[i] = dbModel.OnboardRanker{ - ModelName: ranker.ModelName, - Calibration: ranker.Calibration, - EndPoint: ranker.EndPoint, - EntityID: ranker.EntityID, - Inputs: make([]dbModel.PredatorInput, len(ranker.Inputs)), - Outputs: make([]dbModel.PredatorOutput, len(ranker.Outputs)), + ModelName: ranker.ModelName, + Calibration: ranker.Calibration, + EndPoint: ranker.EndPoint, + EntityID: ranker.EntityID, + Inputs: make([]dbModel.PredatorInput, len(ranker.Inputs)), + Outputs: make([]dbModel.PredatorOutput, len(ranker.Outputs)), + BatchSize: ranker.BatchSize, + Deadline: ranker.Deadline, + RoutingConfig: make([]dbModel.RoutingConfig, len(ranker.RoutingConfig)), } for j, input := range ranker.Inputs { dbOnboardPayload.Rankers[i].Inputs[j] = dbModel.PredatorInput{ @@ -364,6 +383,14 @@ func AdaptToDBOnboardPayload(onboardPayload OnboardPayload) dbModel.OnboardPaylo DataType: input.DataType, } } + + for k, config := range ranker.RoutingConfig { + dbOnboardPayload.Rankers[i].RoutingConfig[k] = dbModel.RoutingConfig{ + ModelName: config.ModelName, + ModelEndpoint: config.ModelEndpoint, + RoutingPercentage: config.RoutingPercentage, + } + } for j, output := range ranker.Outputs { dbOnboardPayload.Rankers[i].Outputs[j] = dbModel.PredatorOutput{ Name: output.Name, @@ -417,12 +444,15 @@ func AdaptFromDbToOnboardPayload(dbOnboardPayload dbModel.OnboardPayload) Onboar for i, predatorComponent := range dbOnboardPayload.Rankers { onboardPayload.Rankers = append(onboardPayload.Rankers, Ranker{ - ModelName: predatorComponent.ModelName, - Calibration: predatorComponent.Calibration, - EndPoint: predatorComponent.EndPoint, - Inputs: make([]Input, len(predatorComponent.Inputs)), - Outputs: make([]Output, len(predatorComponent.Outputs)), - EntityID: predatorComponent.EntityID, + ModelName: predatorComponent.ModelName, + Calibration: predatorComponent.Calibration, + EndPoint: predatorComponent.EndPoint, + Inputs: make([]Input, len(predatorComponent.Inputs)), + Outputs: make([]Output, len(predatorComponent.Outputs)), + EntityID: predatorComponent.EntityID, + BatchSize: predatorComponent.BatchSize, + Deadline: predatorComponent.Deadline, + RoutingConfig: make([]RoutingConfig, len(predatorComponent.RoutingConfig)), }) for j, input := range predatorComponent.Inputs { onboardPayload.Rankers[i].Inputs[j] = Input{ @@ -440,6 +470,14 @@ func AdaptFromDbToOnboardPayload(dbOnboardPayload dbModel.OnboardPayload) Onboar DataType: output.DataType, } } + + for k, config := range predatorComponent.RoutingConfig { + onboardPayload.Rankers[i].RoutingConfig[k] = RoutingConfig{ + ModelName: config.ModelName, + ModelEndpoint: config.ModelEndpoint, + RoutingPercentage: config.RoutingPercentage, + } + } } for _, reRanker := range dbOnboardPayload.ReRankers { @@ -507,6 +545,16 @@ func AdaptFromDbToPredatorComponent(dbPredatorComponents []dbModel.PredatorCompo } } + routingConfig := make([]RoutingConfig, len(predatorComponent.RoutingConfig)) + + for i, config := range predatorComponent.RoutingConfig { + routingConfig[i] = RoutingConfig{ + ModelName: config.ModelName, + ModelEndpoint: config.ModelEndpoint, + RoutingPercentage: config.RoutingPercentage, + } + } + predatorComponent := PredatorComponent{ Component: predatorComponent.Component, ComponentID: predatorComponent.ComponentID, @@ -517,6 +565,7 @@ func AdaptFromDbToPredatorComponent(dbPredatorComponents []dbModel.PredatorCompo BatchSize: predatorComponent.BatchSize, Inputs: dbInputs, Outputs: dbOutputs, + RoutingConfig: routingConfig, } predatorComponents = append(predatorComponents, predatorComponent) } @@ -685,6 +734,15 @@ func AdaptToEtcdPredatorComponent(dbPredatorComponents []dbModel.PredatorCompone DataType: output.DataType, } } + routingConfig := make([]etcdModel.RoutingConfig, len(predatorComponent.RoutingConfig)) + + for i, config := range predatorComponent.RoutingConfig { + routingConfig[i] = etcdModel.RoutingConfig{ + ModelName: config.ModelName, + ModelEndpoint: config.ModelEndpoint, + RoutingPercentage: config.RoutingPercentage, + } + } predatorComponent := etcdModel.PredatorComponent{ Component: predatorComponent.Component, @@ -696,6 +754,7 @@ func AdaptToEtcdPredatorComponent(dbPredatorComponents []dbModel.PredatorCompone BatchSize: predatorComponent.BatchSize, Inputs: dbInputs, Outputs: dbOutputs, + RoutingConfig: routingConfig, } predatorComponents = append(predatorComponents, predatorComponent) } diff --git a/horizon/internal/inferflow/handler/config_builder.go b/horizon/internal/inferflow/handler/config_builder.go index a5ecbedc..c9382029 100644 --- a/horizon/internal/inferflow/handler/config_builder.go +++ b/horizon/internal/inferflow/handler/config_builder.go @@ -1,17 +1,16 @@ package handler import ( - "context" "fmt" "sort" "strconv" "strings" + "github.com/Meesho/BharatMLStack/horizon/internal/externalcall" + inferflow "github.com/Meesho/BharatMLStack/horizon/internal/inferflow" ofsHandler "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store/handler" etcd "github.com/Meesho/BharatMLStack/horizon/internal/inferflow/etcd" - "github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client" - "github.com/Meesho/price-aggregator-go/pricingfeatureretrieval/client/models" mapset "github.com/deckarep/golang-set/v2" ) @@ -38,11 +37,10 @@ const ( featureClassInvalid = "invalid" COMPONENT_NAME_PREFIX = "composite_key_gen_" FEATURE_INITIALIZER = "feature_initializer" - rtpClientVersion = 1 ) func (m *InferFlow) GetInferflowConfig(request InferflowOnboardRequest, token string) (InferflowConfig, error) { - client.Init() + // RTP client is initialized in externalcall.Init() entityIDs := extractEntityIDs(request) featureList, featureToDataType, rtpFeatures, pcvrCalibrationFeatures, pctrCalibrationFeatures, predatorAndNumerixOutputsToDataType, offlineToOnlineMapping, err := GetFeatureList(request, m.EtcdConfig, token, entityIDs) @@ -98,7 +96,7 @@ func (m *InferFlow) GetInferflowConfig(request InferflowOnboardRequest, token st // and returns a set of features, a map of feature to data type, a map of offline feature to online feature // and an error if any. func GetFeatureList(request InferflowOnboardRequest, etcdConfig etcd.Manager, token string, entityIDs map[string]bool) (mapset.Set[string], map[string]string, mapset.Set[string], mapset.Set[string], mapset.Set[string], map[string]string, map[string]string, error) { - initialFeatures, featureToDataType, predatorAndNumerixOutputsToDataType := extractFeatures(request, entityIDs) + initialFeatures, featureToDataType, predatorAndIrisOutputsToDataType := extractFeatures(request, entityIDs) offlineFeatures, onlineFeatures, _, rtpFeatures, pctrCalibrationFeatures, pcvrCalibrationFeatures, newFeatureToDataType, err := classifyFeatures(initialFeatures, featureToDataType) if err != nil { @@ -124,26 +122,48 @@ func GetFeatureList(request InferflowOnboardRequest, etcdConfig etcd.Manager, to features.Add(f) } - rtpComponentFeatures, rtpComponentFeatureToDataType, err := fetchRTPComponentFeatures(rtpFeatures, etcdConfig) + // Fetch RTP registry once for classification + rtpRegistry, err := GetRTPFeatureGroupDataTypeMap() + if err != nil && inferflow.IsMeeshoEnabled { + return nil, nil, nil, nil, nil, nil, nil, fmt.Errorf("failed to get RTP registry: %w", err) + } + + // Fetch component features from RTP components + rtpComponentFSFeatures, rtpComponentRTPFeatures, rtpComponentFeatureToDataType, err := fetchRTPComponentFeaturesWithClassification(rtpFeatures, etcdConfig, rtpRegistry) if err != nil { return nil, nil, nil, nil, nil, nil, nil, err } - for _, f := range rtpComponentFeatures.ToSlice() { + + // Add FS features to the main features set + for _, f := range rtpComponentFSFeatures.ToSlice() { features.Add(f) } + // Add newly discovered RTP features to rtpFeatures set + for _, f := range rtpComponentRTPFeatures.ToSlice() { + rtpFeatures.Add(f) + } + for f, dtype := range rtpComponentFeatureToDataType { featureToDataType[f] = dtype } - componentFeatures, newfeatureToDataType, err := fetchComponentFeatures(features, pctrCalibrationFeatures, pcvrCalibrationFeatures, etcdConfig, request.Payload.RealEstate, token) + // Fetch component features from regular FS components + componentFSFeatures, componentRTPFeatures, newfeatureToDataType, err := fetchComponentFeaturesWithClassification(features, pctrCalibrationFeatures, pcvrCalibrationFeatures, etcdConfig, request.Payload.RealEstate, token, rtpRegistry) if err != nil { return nil, nil, nil, nil, nil, nil, nil, err } - for _, f := range componentFeatures.ToSlice() { + + // Add FS features to the main features set + for _, f := range componentFSFeatures.ToSlice() { features.Add(f) } + // Add newly discovered RTP features to rtpFeatures set + for _, f := range componentRTPFeatures.ToSlice() { + rtpFeatures.Add(f) + } + for f, dtype := range newfeatureToDataType { featureToDataType[f] = dtype } @@ -156,7 +176,7 @@ func GetFeatureList(request InferflowOnboardRequest, etcdConfig etcd.Manager, to return nil, nil, nil, nil, nil, nil, nil, err } - return features, featureToDataType, rtpFeatures, pctrCalibrationFeatures, pcvrCalibrationFeatures, predatorAndNumerixOutputsToDataType, offlineToOnlineMapping, nil + return features, featureToDataType, rtpFeatures, pctrCalibrationFeatures, pcvrCalibrationFeatures, predatorAndIrisOutputsToDataType, offlineToOnlineMapping, nil } func extractEntityIDs(request InferflowOnboardRequest) map[string]bool { @@ -304,7 +324,7 @@ func fetchMissingDatatypes( // Query RTP API once for all RTP datatypes if rtpFeaturesToFetch.Cardinality() > 0 { - rtpDataTypeMap, err := GetRTPFeatureGroupDataTypeMap(token) + rtpDataTypeMap, err := GetRTPFeatureGroupDataTypeMap() if err == nil { for _, feature := range rtpFeaturesToFetch.ToSlice() { if dataType, exists := rtpDataTypeMap[feature]; exists { @@ -373,9 +393,12 @@ func classifyFeatures( pcvrCalibrationFeatures := mapset.NewSet[string]() newFeatureToDataType := make(map[string]string) - add := func(targetSet mapset.Set[string], name, originalFeature string) { - targetSet.Add(name) + add := func(name, originalFeature string, featureType string) error { + if err := AddFeatureToSet(&defaultFeatures, &onlineFeatures, &offlineFeatures, &rtpFeatures, &pctrCalibrationFeatures, &pcvrCalibrationFeatures, name, featureType); err != nil { + return fmt.Errorf("error classifying feature: %w", err) + } newFeatureToDataType[name] = featureDataTypes[originalFeature] + return nil } for feature := range featureList.Iter() { @@ -384,25 +407,39 @@ func classifyFeatures( return nil, nil, nil, nil, nil, nil, nil, err } - switch featureType { - case featureClassOffline: - add(offlineFeatures, transformedFeature, feature) - case featureClassOnline: - add(onlineFeatures, transformedFeature, feature) - case featureClassDefault: - add(defaultFeatures, transformedFeature, feature) - case featureClassRtp: - add(rtpFeatures, transformedFeature, feature) - case featureClassPCVRCalibration: - add(pcvrCalibrationFeatures, transformedFeature, feature) - case featureClassPCTRCalibration: - add(pctrCalibrationFeatures, transformedFeature, feature) + if err := add(transformedFeature, feature, featureType); err != nil { + return nil, nil, nil, nil, nil, nil, nil, err } } return offlineFeatures, onlineFeatures, defaultFeatures, rtpFeatures, pctrCalibrationFeatures, pcvrCalibrationFeatures, newFeatureToDataType, nil } +func AddFeatureToSet(defaultFeatures, onlineFeatures, offlineFeatures, rtpFeatures, pctrCalibrationFeatures, pcvrCalibrationFeatures *mapset.Set[string], feature string, featureType string) error { + allSets := map[string]*mapset.Set[string]{ + featureClassDefault: defaultFeatures, + featureClassOnline: onlineFeatures, + featureClassOffline: offlineFeatures, + featureClassRtp: rtpFeatures, + featureClassPCTRCalibration: pctrCalibrationFeatures, + featureClassPCVRCalibration: pcvrCalibrationFeatures, + } + + for setType, set := range allSets { + if setType != featureType && (*set).Contains(feature) { + return fmt.Errorf("feature '%s' already exists in %s features, cannot add to %s features", feature, setType, featureType) + } + } + + targetSet, exists := allSets[featureType] + if !exists { + return fmt.Errorf("invalid feature type '%s' for feature '%s'", featureType, feature) + } + + (*targetSet).Add(feature) + return nil +} + // transformFeature transforms the feature to either online, offline or default feature // and returns the transformed feature and the feature type. The feature can be of these given types: // 1. PARENT_OFFLINE_FEATURE|FEATURE @@ -475,49 +512,95 @@ func mapOfflineFeatures(offlineFeatureList mapset.Set[string], token string) (ma return GetOnlineFeatureMapping(offlineFeatureList, token) } -func fetchRTPComponentFeatures(rtpFeatures mapset.Set[string], etcdConfig etcd.Manager) (mapset.Set[string], map[string]string, error) { +func fetchRTPComponentFeaturesWithClassification(rtpFeatures mapset.Set[string], etcdConfig etcd.Manager, rtpRegistry map[string]string) (mapset.Set[string], mapset.Set[string], map[string]string, error) { componentList := getComponentList(rtpFeatures, nil, nil) - componentFeatures := mapset.NewSet[string]() + fsFeatures := mapset.NewSet[string]() + newRTPFeatures := mapset.NewSet[string]() featureToDataType := make(map[string]string) for _, component := range componentList.ToSlice() { componentData := etcdConfig.GetComponentData(component) if componentData == nil { - return nil, nil, fmt.Errorf("RTP Component: componentData for '%s' not found in registry", component) + return nil, nil, nil, fmt.Errorf("RTP Component: componentData for '%s' not found in registry", component) } for _, pair := range componentData.FSIdSchemaToValueColumns { if strings.Contains(pair.ValueCol, COLON_DELIMITER) { - componentFeatures.Add(pair.ValueCol) + // Check if this is an RTP feature or FS feature + isRTPFeature := false + + // Check direct match in RTP registry + if _, exists := rtpRegistry[pair.ValueCol]; exists { + isRTPFeature = true + } else { + // Check with prefix removed (for features like "parent:entity:group:feature") + parts := strings.Split(pair.ValueCol, COLON_DELIMITER) + if len(parts) == 4 { + withoutPrefix := strings.Join(parts[1:], COLON_DELIMITER) + if _, exists := rtpRegistry[withoutPrefix]; exists { + isRTPFeature = true + } + } + } + + if isRTPFeature { + newRTPFeatures.Add(pair.ValueCol) + } else { + fsFeatures.Add(pair.ValueCol) + } featureToDataType[pair.ValueCol] = pair.DataType } } } - return componentFeatures, featureToDataType, nil + return fsFeatures, newRTPFeatures, featureToDataType, nil } -// fetchComponentFeatures fetches the component features from the etcd config -// and returns a set of component features and a map of component feature to data type -func fetchComponentFeatures(features mapset.Set[string], pctrCalibrationFeatures mapset.Set[string], pcvrCalibrationFeatures mapset.Set[string], etcdConfig etcd.Manager, realEstate string, token string) (mapset.Set[string], map[string]string, error) { +// fetchComponentFeaturesWithClassification fetches the component features from the etcd config +// and classifies them as RTP or FS features, returns both sets and a map of feature to data type +func fetchComponentFeaturesWithClassification(features mapset.Set[string], pctrCalibrationFeatures mapset.Set[string], pcvrCalibrationFeatures mapset.Set[string], etcdConfig etcd.Manager, realEstate string, token string, rtpRegistry map[string]string) (mapset.Set[string], mapset.Set[string], map[string]string, error) { componentList := getComponentList(features, pctrCalibrationFeatures, pcvrCalibrationFeatures) - componentFeatures := mapset.NewSet[string]() + fsFeatures := mapset.NewSet[string]() + newRTPFeatures := mapset.NewSet[string]() featureToDataType := make(map[string]string) for _, component := range componentList.ToSlice() { componentData := etcdConfig.GetComponentData(component) if componentData == nil { - return nil, nil, fmt.Errorf("Component Data: ComponentData for '%s' not found in registry.\nPlease Contact MLP Team to onboard the component.", component) + return nil, nil, nil, fmt.Errorf("Component Data: ComponentData for '%s' not found in registry.\nPlease Contact MLP Team to onboard the component.", component) } + for _, pair := range componentData.FSIdSchemaToValueColumns { if strings.Contains(pair.ValueCol, COLON_DELIMITER) { - componentFeatures.Add(pair.ValueCol) + // Check if this is an RTP feature or FS feature + isRTPFeature := false + + // Check direct match in RTP registry + if _, exists := rtpRegistry[pair.ValueCol]; exists { + isRTPFeature = true + } else { + // Check with prefix removed (for features like "parent:entity:group:feature") + parts := strings.Split(pair.ValueCol, COLON_DELIMITER) + if len(parts) == 4 { + withoutPrefix := strings.Join(parts[1:], COLON_DELIMITER) + if _, exists := rtpRegistry[withoutPrefix]; exists { + isRTPFeature = true + } + } + } + + if isRTPFeature { + newRTPFeatures.Add(pair.ValueCol) + } else { + fsFeatures.Add(pair.ValueCol) + } featureToDataType[pair.ValueCol] = pair.DataType } } if override, hasOverride := componentData.Overridecomponent[realEstate]; hasOverride { - componentFeatures.Add(override.ComponentId) + // Override components are always FS features + fsFeatures.Add(override.ComponentId) parts := strings.Split(override.ComponentId, COLON_DELIMITER) var label, group string @@ -526,23 +609,23 @@ func fetchComponentFeatures(features mapset.Set[string], pctrCalibrationFeatures } else if len(parts) == 4 { label, group = parts[1], parts[2] } else { - return nil, nil, fmt.Errorf("Component Data: invalid override component id: %s", override.ComponentId) + return nil, nil, nil, fmt.Errorf("Component Data: invalid override component id: %s", override.ComponentId) } featureGroupDataTypeMap, err := GetFeatureGroupDataTypeMap(label, token) if err != nil { - return nil, nil, fmt.Errorf("Component Data: error getting feature group data type map: %w", err) + return nil, nil, nil, fmt.Errorf("Component Data: error getting feature group data type map: %w", err) } if dataType, exists := featureGroupDataTypeMap[group]; exists { featureToDataType[override.ComponentId] = dataType } else { - return nil, nil, fmt.Errorf("Component Data: feature group data type not found for %s: %s", override.ComponentId, group) + return nil, nil, nil, fmt.Errorf("Component Data: feature group data type not found for %s: %s", override.ComponentId, group) } } } - return componentFeatures, featureToDataType, nil + return fsFeatures, newRTPFeatures, featureToDataType, nil } // getComponentList gets the component list from the features @@ -861,7 +944,10 @@ func GetRTPComponents(request InferflowOnboardRequest, rtpFeatures mapset.Set[st return rtpComponents, nil } - featureDataTypeMap, err := GetRTPFeatureGroupDataTypeMap(token) + featureDataTypeMap, err := GetRTPFeatureGroupDataTypeMap() + if err != nil && inferflow.IsMeeshoEnabled { + return rtpComponents, nil + } rtpFeatureComponentsMap := GetRTPFeatureLabelToPrefixToFeatureGroupToFeatureMap(rtpFeatures.ToSlice()) for label, prefixToFeatureGroupToFeatureMap := range rtpFeatureComponentsMap { if err != nil { @@ -1006,37 +1092,52 @@ func GetRTPFeatureLabelToPrefixToFeatureGroupToFeatureMap(featureStrings []strin return featuresMap } -func GetRTPFeatureGroupDataTypeMap(token string) (map[string]string, error) { - rtpFeatureToDataType := make(map[string]string) - clientInstance := client.Instance(rtpClientVersion) - rtpStruct, err := clientInstance.GetDataTypes(context.Background(), &models.GetDataTypesRequest{}, map[string]string{}) - if err != nil { - return nil, err - } - for _, entity := range rtpStruct.Entities { - for _, featureGroup := range entity.FeatureGroups { - for _, feature := range featureGroup.Features { - compositeName := entity.Entity + COLON_DELIMITER + featureGroup.Label + COLON_DELIMITER + feature - rtpFeatureToDataType[compositeName] = featureGroup.DataType - } - } - } - return rtpFeatureToDataType, nil +func GetRTPFeatureGroupDataTypeMap() (map[string]string, error) { + return externalcall.PricingClient.GetFeatureGroupDataTypeMap() } func GetPredatorComponents(request InferflowOnboardRequest, offlineToOnlineMapping map[string]string) ([]PredatorComponent, error) { predatorComponents := make([]PredatorComponent, 0, len(request.Payload.Rankers)) for i, ranker := range request.Payload.Rankers { + + // validate routing config + if len(ranker.RoutingConfig) > 0 { + totalRoutingPercentage := float32(0) + defaultEndPointFallback := false + for _, route := range ranker.RoutingConfig { + if route.ModelName == "" || route.ModelEndpoint == "" { + return nil, fmt.Errorf("Predator Components: model name or model endpoint is missing for routing config") + } + if route.RoutingPercentage < 0 { + return nil, fmt.Errorf("Predator Components: routing percentage is less than 0 for routing config") + } + totalRoutingPercentage += route.RoutingPercentage + if route.ModelEndpoint == ranker.EndPoint { + defaultEndPointFallback = true + } + } + if defaultEndPointFallback { + if totalRoutingPercentage != 100.0 { + return nil, fmt.Errorf("Default endpoint included but total routing percentage is not 100") + } + } else { + if totalRoutingPercentage > 100.0 { + return nil, fmt.Errorf("Total routing percentage is greater than 100") + } + } + } + predatorComponent := PredatorComponent{ Component: "p" + strconv.Itoa(i+1), ComponentID: strings.Join(ranker.EntityID, COLON_DELIMITER), ModelName: ranker.ModelName, ModelEndPoint: ranker.EndPoint, - Deadline: 110, - BatchSize: 250, + Deadline: ranker.Deadline, + BatchSize: ranker.BatchSize, Inputs: make([]PredatorInput, 0, len(ranker.Inputs)), Outputs: make([]PredatorOutput, 0, len(ranker.Outputs)), + RoutingConfig: ranker.RoutingConfig, } if ranker.Calibration != "" { @@ -1184,7 +1285,7 @@ func GetDagExecutionConfig(request InferflowOnboardRequest, featureComponents [] for _, component := range featureComponents { componentName := component.Component - specificDependencies := findSpecificFeatureDependencies(component, featureComponents, etcdConfig, request) + specificDependencies := findSpecificFeatureDependencies(component, featureComponents, rtpComponents) if len(specificDependencies) > 0 { dagExecutionConfig.ComponentDependency[componentName] = append(dagExecutionConfig.ComponentDependency[componentName], specificDependencies...) @@ -1234,7 +1335,7 @@ func GetDagExecutionConfig(request InferflowOnboardRequest, featureComponents [] return dagExecutionConfig, nil } -func findSpecificFeatureDependencies(featureComp FeatureComponent, featureComponents []FeatureComponent, etcdConfig etcd.Manager, request InferflowOnboardRequest) []string { +func findSpecificFeatureDependencies(featureComp FeatureComponent, featureComponents []FeatureComponent, rtpComponents []RTPComponent) []string { var dependencies []string requiredInputs := make(map[string]struct{}) @@ -1260,6 +1361,23 @@ func findSpecificFeatureDependencies(featureComp FeatureComponent, featureCompon } } + for _, rtpComp := range rtpComponents { + if done, ok := completedComponents[rtpComp.Component]; ok && done { + continue + } + colNamePrefix := rtpComp.ColNamePrefix + for _, featureGroup := range rtpComp.FeatureRequest.FeatureGroups { + for _, feature := range featureGroup.Features { + featureKey := colNamePrefix + rtpComp.FeatureRequest.Label + COLON_DELIMITER + featureGroup.Label + COLON_DELIMITER + feature + if _, required := requiredInputs[featureKey]; required && !completedComponents[rtpComp.Component] { + dependencies = append(dependencies, rtpComp.Component) + completedComponents[rtpComp.Component] = true + break + } + } + } + } + return dependencies } diff --git a/horizon/internal/inferflow/handler/inferflow.go b/horizon/internal/inferflow/handler/inferflow.go index 8fa97216..23149c4e 100644 --- a/horizon/internal/inferflow/handler/inferflow.go +++ b/horizon/internal/inferflow/handler/inferflow.go @@ -10,6 +10,7 @@ import ( "github.com/Meesho/BharatMLStack/horizon/internal/externalcall" mainHandler "github.com/Meesho/BharatMLStack/horizon/internal/externalcall" + inferflowPkg "github.com/Meesho/BharatMLStack/horizon/internal/inferflow" etcd "github.com/Meesho/BharatMLStack/horizon/internal/inferflow/etcd" pb "github.com/Meesho/BharatMLStack/horizon/internal/inferflow/handler/proto/protogen" discovery_config "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/discoveryconfig" @@ -99,45 +100,29 @@ func (m *InferFlow) Onboard(request InferflowOnboardRequest, token string) (Resp configId := fmt.Sprintf("%s-%s-%s", request.Payload.RealEstate, request.Payload.Tenant, request.Payload.ConfigIdentifier) - requests, err := m.InferFlowRequestRepo.GetAll() + response, err := m.ValidateOnboardRequest(request.Payload) if err != nil { - return Response{}, errors.New("failed to get all requests: " + err.Error()) + return Response{}, errors.New("failed to validate onboard request: " + err.Error()) + } + if response.Error != emptyResponse { + return Response{}, errors.New("model proxy config is invalid: " + response.Error) } exists := false - configs, err := m.InferFlowConfigRepo.GetByID(configId) - if err != nil { + if err == nil { exists = configs.Active } if exists { return Response{}, errors.New("Config ID already exists") } - for _, request := range requests { - if request.ConfigID == configId { - if request.RequestType == onboardRequestType || request.RequestType == cloneRequestType { - if request.Status == rejected { - exists = false - } else { - exists = true - } - } - if request.RequestType == deleteRequestType && request.Status == approved { - exists = false - } - } - } - if exists { - return Response{}, errors.New("Config ID already exists") - } - inferFlowConfig, err := m.GetInferflowConfig(request, token) if err != nil { return Response{}, errors.New("failed to generate model proxy config: " + err.Error()) } - response, err := ValidateInferFlowConfig(inferFlowConfig, token) + response, err = ValidateInferFlowConfig(inferFlowConfig, token) if err != nil { return Response{}, errors.New("failed to validate model proxy config: " + err.Error()) } @@ -218,6 +203,15 @@ func (m *InferFlow) Promote(request PromoteConfigRequest) (Response, error) { func (m *InferFlow) Edit(request EditConfigOrCloneConfigRequest, token string) (Response, error) { configId := fmt.Sprintf("%s-%s-%s", request.Payload.RealEstate, request.Payload.Tenant, request.Payload.ConfigIdentifier) + + response, err := m.ValidateOnboardRequest(request.Payload) + if err != nil { + return Response{}, errors.New("failed to validate onboard request: " + err.Error()) + } + if response.Error != emptyResponse { + return Response{}, errors.New("onboard request is invalid: " + response.Error) + } + exists, err := m.InferFlowConfigRepo.DoesConfigIDExist(configId) if err != nil { return Response{}, errors.New("failed to check if config id exists in db: " + err.Error()) @@ -226,16 +220,23 @@ func (m *InferFlow) Edit(request EditConfigOrCloneConfigRequest, token string) ( return Response{}, errors.New("Config ID does not exist") } - existingConfigs, err := m.InferFlowRequestRepo.GetAll() + latestUnapprovedRequests, err := m.InferFlowRequestRepo.GetLatestPendingRequestByConfigID(configId) + if err == nil { + if len(latestUnapprovedRequests) > 0 { + if latestUnapprovedRequests[0].RequestType == editRequestType { + return Response{}, errors.New("Pending edit request with request ID: " + strconv.Itoa(int(latestUnapprovedRequests[0].RequestID))) + } + } + } + + existingConfigs, err := m.InferFlowRequestRepo.GetApprovedRequestsByConfigID(configId) if err != nil { return Response{}, errors.New("failed to get existing configs: " + err.Error()) } newVersion := 1 - for _, config := range existingConfigs { - if config.ConfigID == configId && config.Status == approved { - newVersion = config.Version + 1 - } + if len(existingConfigs) > 0 { + newVersion = existingConfigs[0].Version + 1 } onboardRequest := InferflowOnboardRequest{ @@ -248,7 +249,7 @@ func (m *InferFlow) Edit(request EditConfigOrCloneConfigRequest, token string) ( return Response{}, errors.New("failed to get infer flow config: " + err.Error()) } - response, err := ValidateInferFlowConfig(inferFlowConfig, token) + response, err = ValidateInferFlowConfig(inferFlowConfig, token) if err != nil { return Response{}, errors.New("failed to validate model proxy config: " + err.Error()) } @@ -284,6 +285,15 @@ func (m *InferFlow) Edit(request EditConfigOrCloneConfigRequest, token string) ( func (m *InferFlow) Clone(request EditConfigOrCloneConfigRequest, token string) (Response, error) { configId := fmt.Sprintf("%s-%s-%s", request.Payload.RealEstate, request.Payload.Tenant, request.Payload.ConfigIdentifier) + + response, err := m.ValidateOnboardRequest(request.Payload) + if err != nil { + return Response{}, errors.New("failed to validate onboard request: " + err.Error()) + } + if response.Error != emptyResponse { + return Response{}, errors.New("onboard request is invalid: " + response.Error) + } + exists, err := m.InferFlowRequestRepo.DoesConfigIDExist(configId) if err != nil { return Response{}, errors.New("failed to check if config id exists in db: " + err.Error()) @@ -292,6 +302,13 @@ func (m *InferFlow) Clone(request EditConfigOrCloneConfigRequest, token string) return Response{}, errors.New("Config ID already exists") } + // remove routing config from request + for _, ranker := range request.Payload.Rankers { + if len(ranker.RoutingConfig) > 0 { + ranker.RoutingConfig = make([]RoutingConfig, 0) + } + } + onboardRequest := InferflowOnboardRequest{ Payload: request.Payload, CreatedBy: request.CreatedBy, @@ -302,7 +319,7 @@ func (m *InferFlow) Clone(request EditConfigOrCloneConfigRequest, token string) return Response{}, errors.New("failed to get infer flow config: " + err.Error()) } - response, err := ValidateInferFlowConfig(inferFlowConfig, token) + response, err = ValidateInferFlowConfig(inferFlowConfig, token) if err != nil { return Response{}, errors.New("failed to validate infer flow config: " + err.Error()) } @@ -355,6 +372,8 @@ func (m *InferFlow) ScaleUp(request ScaleUpConfigRequest) (Response, error) { request.Payload.ConfigValue.ComponentConfig.PredatorComponents[i].ModelName = modelNameToEndPointMap[modelName].NewModelName } + request.Payload.ConfigValue.ResponseConfig.LoggingPerc = request.Payload.LoggingPerc + payload, err := AdaptScaleUpRequestToDBPayload(request) if err != nil { return Response{}, errors.New("failed to adapt scale up request to db payload: " + err.Error()) @@ -834,13 +853,61 @@ func ValidateInferFlowConfig(config InferflowConfig, token string) (Response, er }, nil } +func (m *InferFlow) ValidateOnboardRequest(request OnboardPayload) (Response, error) { + for _, ranker := range request.Rankers { + if len(ranker.EntityID) == 0 { + return Response{ + Error: "Entity ID is not set for model: " + ranker.ModelName, + Data: Message{Message: emptyResponse}, + }, errors.New("Entity ID is not set for model: " + ranker.ModelName) + } + for _, output := range ranker.Outputs { + if len(output.ModelScores) != len(output.ModelScoresDims) { + return Response{ + Error: "model scores and model scores dims are not equal for model: " + ranker.ModelName, + Data: Message{Message: emptyResponse}, + }, errors.New("model scores and model scores dims are not equal for model: " + ranker.ModelName) + } + } + } + + for _, reRanker := range request.ReRankers { + if len(reRanker.EntityID) == 0 { + return Response{ + Error: "Entity ID is not set for re ranker: " + reRanker.Score, + Data: Message{Message: emptyResponse}, + }, errors.New("Entity ID is not set for re ranker: " + reRanker.Score) + } + for _, value := range reRanker.EqVariables { + parts := strings.Split(value, PIPE_DELIMITER) + if len(parts) != 2 { + return Response{ + Error: "invalid eq variable: " + value, + Data: Message{Message: emptyResponse}, + }, errors.New("invalid eq variable: " + value) + } + if parts[1] == "" { + return Response{ + Error: "invalid eq variable: " + value, + Data: Message{Message: emptyResponse}, + }, errors.New("invalid eq variable: " + value) + } + } + } + + return Response{ + Error: emptyResponse, + Data: Message{Message: "Request validated successfully"}, + }, nil +} + func (m *InferFlow) GenerateFunctionalTestRequest(request GenerateRequestFunctionalTestingRequest) (GenerateRequestFunctionalTestingResponse, error) { response := GenerateRequestFunctionalTestingResponse{ RequestBody: RequestBody{ Entities: []Entity{ { - Entity: "catalog_id", + Entity: request.Entity + "_id", Ids: []string{}, Features: []FeatureValue{}, }, @@ -855,7 +922,7 @@ func (m *InferFlow) GenerateFunctionalTestRequest(request GenerateRequestFunctio return response, errors.New("invalid batch size: " + err.Error()) } - response.RequestBody.Entities[0].Entity = "catalog_id" + response.RequestBody.Entities[0].Entity = request.Entity + "_id" response.RequestBody.Entities[0].Ids = random.GenerateRandomIntSliceWithRange(batchSize, 100000, 1000000) for feature, value := range request.DefaultFeatures { @@ -885,9 +952,17 @@ func (m *InferFlow) ExecuteFuncitonalTestRequest(request ExecuteRequestFunctiona ep = strings.TrimPrefix(ep, "https://") } ep = strings.TrimSuffix(ep, "/") - if !strings.Contains(ep, ":") { - ep = ep + ":8080" + if idx := strings.LastIndex(ep, ":"); idx != -1 { + if idx < len(ep)-1 { + ep = ep[:idx] + } } + port := ":8080" + env := strings.ToLower(strings.TrimSpace(inferflowPkg.AppEnv)) + if env == "stg" || env == "int" { + port = ":80" + } + ep = ep + port return ep }(request.EndPoint) @@ -988,25 +1063,26 @@ func (m *InferFlow) ExecuteFuncitonalTestRequest(request ExecuteRequestFunctiona } func (m *InferFlow) GetLatestRequest(requestID string) (GetLatestRequestResponse, error) { - requests, err := m.InferFlowRequestRepo.GetAll() + requests, err := m.InferFlowRequestRepo.GetApprovedRequestsByConfigID(requestID) if err != nil { return GetLatestRequestResponse{ Error: "failed to get latest request: " + err.Error(), Data: RequestConfig{}, }, err } + latestRequest := inferflow_request.Table{} - for _, request := range requests { - if request.Status == approved && request.ConfigID == requestID { - latestRequest = request - } + if len(requests) > 0 { + latestRequest = requests[0] } + if latestRequest.Payload.RequestPayload.ConfigIdentifier == "" { return GetLatestRequestResponse{ Error: "failed to find latest request", Data: RequestConfig{}, }, err } + return GetLatestRequestResponse{ Error: emptyResponse, Data: RequestConfig{ diff --git a/horizon/internal/inferflow/handler/models.go b/horizon/internal/inferflow/handler/models.go index 1d6bfd84..c595ba93 100644 --- a/horizon/internal/inferflow/handler/models.go +++ b/horizon/internal/inferflow/handler/models.go @@ -20,13 +20,22 @@ type Output struct { DataType string `json:"data_type"` } +type RoutingConfig struct { + ModelName string `json:"model_name"` + ModelEndpoint string `json:"model_endpoint"` + RoutingPercentage float32 `json:"routing_percentage"` +} + type Ranker struct { - ModelName string `json:"model_name"` - Calibration string `json:"calibration"` - EndPoint string `json:"end_point"` - Inputs []Input `json:"inputs"` - Outputs []Output `json:"outputs"` - EntityID []string `json:"entity_id"` + ModelName string `json:"model_name"` + BatchSize int `json:"batch_size"` + Deadline int `json:"deadline"` + Calibration string `json:"calibration"` + EndPoint string `json:"end_point"` + Inputs []Input `json:"inputs"` + Outputs []Output `json:"outputs"` + EntityID []string `json:"entity_id"` + RoutingConfig []RoutingConfig `json:"route_config"` } type ReRanker struct { @@ -120,6 +129,7 @@ type PredatorComponent struct { BatchSize int `json:"batch_size"` Inputs []PredatorInput `json:"inputs"` Outputs []PredatorOutput `json:"outputs"` + RoutingConfig []RoutingConfig `json:"route_config,omitempty"` } type FinalResponseConfig struct { @@ -175,7 +185,7 @@ type ComponentConfig struct { CacheTTL int `json:"cache_ttl"` CacheVersion int `json:"cache_version"` FeatureComponents []FeatureComponent `json:"feature_components"` - RTPComponents []RTPComponent `json:"real_time_pricing_feature_components"` + RTPComponents []RTPComponent `json:"real_time_pricing_feature_components,omitempty"` PredatorComponents []PredatorComponent `json:"predator_components"` NumerixComponents []NumerixComponent `json:"numerix_components"` } @@ -248,6 +258,7 @@ type ScaleUpConfigPayload struct { ConfigID string `json:"config_id"` ConfigValue InferflowConfig `json:"config_value"` ConfigMapping ConfigMapping `json:"config_mapping"` + LoggingPerc int `json:"logging_perc"` ModelNameToEndPointMap []ModelNameToEndPointMap `json:"proposed_model_endpoints"` } @@ -377,6 +388,7 @@ type GenerateRequestFunctionalTestingRequest struct { MetaData map[string]string `json:"meta_data"` DefaultFeatures map[string]string `json:"default_features"` ModelConfigID string `json:"model_config_id"` + Entity string `json:"entity"` } type GenerateRequestFunctionalTestingResponse struct { diff --git a/horizon/internal/inferflow/init.go b/horizon/internal/inferflow/init.go index 3ade265d..5be4bed6 100644 --- a/horizon/internal/inferflow/init.go +++ b/horizon/internal/inferflow/init.go @@ -10,6 +10,7 @@ var ( InferflowAppName string AppEnv string HorizonAppName string + IsMeeshoEnabled bool initOnce sync.Once ) @@ -18,5 +19,6 @@ func Init(config configs.Configs) { InferflowAppName = config.InferflowAppName AppEnv = config.AppEnv HorizonAppName = config.HorizonAppName + IsMeeshoEnabled = config.IsMeeshoEnabled }) } diff --git a/horizon/internal/init.go b/horizon/internal/init.go index c09ea61c..99e94355 100644 --- a/horizon/internal/init.go +++ b/horizon/internal/init.go @@ -6,6 +6,7 @@ import ( "github.com/Meesho/BharatMLStack/horizon/internal/externalcall" inferflow "github.com/Meesho/BharatMLStack/horizon/internal/inferflow" "github.com/Meesho/BharatMLStack/horizon/internal/numerix" + onlinefeaturestore "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store" "github.com/Meesho/BharatMLStack/horizon/internal/predator" ) @@ -16,5 +17,5 @@ func InitAll(config configs.Configs) { predator.Init(config) deployableHandler.Init(config) inferflow.Init(config) - + onlinefeaturestore.Init(config) } diff --git a/horizon/internal/online-feature-store/config/etcd.go b/horizon/internal/online-feature-store/config/etcd.go index a39fa35a..c34a21d9 100644 --- a/horizon/internal/online-feature-store/config/etcd.go +++ b/horizon/internal/online-feature-store/config/etcd.go @@ -9,10 +9,10 @@ import ( "strconv" "strings" + onlinefeaturestore "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store" "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store/config/enums" "github.com/Meesho/BharatMLStack/horizon/pkg/etcd" "github.com/rs/zerolog/log" - "github.com/spf13/viper" ) const ( @@ -28,9 +28,9 @@ type Etcd struct { func NewEtcdConfig() Manager { return &Etcd{ - instance: etcd.Instance()[viper.GetString("ONLINE_FEATURE_STORE_APP_NAME")], - appName: viper.GetString("ONLINE_FEATURE_STORE_APP_NAME"), - env: viper.GetString("APP_ENV"), + instance: etcd.Instance()[onlinefeaturestore.OnlineFeatureStoreAppName], + appName: onlinefeaturestore.OnlineFeatureStoreAppName, + env: onlinefeaturestore.AppEnv, } } diff --git a/horizon/internal/online-feature-store/handler/online_feature_store.go b/horizon/internal/online-feature-store/handler/online_feature_store.go index ad8710eb..11b50fc0 100644 --- a/horizon/internal/online-feature-store/handler/online_feature_store.go +++ b/horizon/internal/online-feature-store/handler/online_feature_store.go @@ -8,8 +8,7 @@ import ( "strings" "time" - "github.com/spf13/viper" - + onlinefeaturestore "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store" "github.com/Meesho/BharatMLStack/horizon/internal/repositories/scylla" "github.com/Meesho/BharatMLStack/horizon/pkg/infra" @@ -47,31 +46,27 @@ var ( func InitV1ConfigHandler() Config { if config == nil { once.Do(func() { - scyllaActiveConfIdsStr := viper.GetString(storageScyllaPrefix + activeConfIds) - redisFailoverActiveConfIdsStr := viper.GetString(storageRedisFailoverPrefix + activeConfIds) - - scyllaStores := make(map[int]scylla.Store) - - if scyllaActiveConfIdsStr != "" { - scyllaActiveIds := strings.Split(scyllaActiveConfIdsStr, ",") - scyllaStores = make(map[int]scylla.Store, len(scyllaActiveIds)) - for _, configIdStr := range scyllaActiveIds { - confIdToDbTypeMap[configIdStr] = "scylla" - activeConfigId, err := strconv.Atoi(configIdStr) - if err != nil { - log.Error().Msgf("Error in converting config id %s to int", configIdStr) - continue - } - connFacade, _ := infra.Scylla.GetConnection(activeConfigId) - conn := connFacade.(*infra.ScyllaClusterConnection) - scyllaStore, err2 := scylla.NewRepository(conn) - if err2 != nil { - log.Error().Msgf("Error in creating scylla store") - } - scyllaStores[activeConfigId] = scyllaStore + scyllaActiveConfIdsStr := onlinefeaturestore.ScyllaActiveConfIdsStr + redisFailoverActiveConfIdsStr := onlinefeaturestore.RedisFailoverActiveConfIdsStr + if scyllaActiveConfIdsStr == "" { + return + } + scyllaActiveIds := strings.Split(scyllaActiveConfIdsStr, ",") + scyllaStores := make(map[int]scylla.Store, len(scyllaActiveIds)) + for _, configIdStr := range scyllaActiveIds { + confIdToDbTypeMap[configIdStr] = "scylla" + activeConfigId, err := strconv.Atoi(configIdStr) + if err != nil { + log.Error().Msgf("Error in converting config id %s to int", configIdStr) + continue + } + connFacade, _ := infra.Scylla.GetConnection(activeConfigId) + conn := connFacade.(*infra.ScyllaClusterConnection) + scyllaStore, err2 := scylla.NewRepository(conn) + if err2 != nil { + log.Error().Msgf("Error in creating scylla store") } - } else { - log.Warn().Msg("SCYLLA_ACTIVE_CONFIG_IDS not configured, running without Scylla stores") + scyllaStores[activeConfigId] = scyllaStore } if redisFailoverActiveConfIdsStr != "" { redisFailoverActiveIds := strings.Split(redisFailoverActiveConfIdsStr, ",") diff --git a/horizon/internal/online-feature-store/init.go b/horizon/internal/online-feature-store/init.go new file mode 100644 index 00000000..99396911 --- /dev/null +++ b/horizon/internal/online-feature-store/init.go @@ -0,0 +1,24 @@ +package onlinefeaturestore + +import ( + "sync" + + "github.com/Meesho/BharatMLStack/horizon/internal/configs" +) + +var ( + initOnlineFeatureStoreOnce sync.Once + ScyllaActiveConfIdsStr string + RedisFailoverActiveConfIdsStr string + OnlineFeatureStoreAppName string + AppEnv string +) + +func Init(config configs.Configs) { + initOnlineFeatureStoreOnce.Do(func() { + ScyllaActiveConfIdsStr = config.ScyllaActiveConfIds + RedisFailoverActiveConfIdsStr = config.RedisFailoverActiveConfIds + OnlineFeatureStoreAppName = config.OnlineFeatureStoreAppName + AppEnv = config.AppEnv + }) +} diff --git a/horizon/internal/predator/handler/predator.go b/horizon/internal/predator/handler/predator.go index e5fd2da6..98dd4a39 100644 --- a/horizon/internal/predator/handler/predator.go +++ b/horizon/internal/predator/handler/predator.go @@ -3786,6 +3786,9 @@ func (p *Predator) validateOfflineFeatures(features []string, token string) erro // validatePricingFeatures validates pricing features for a specific entity func (p *Predator) validatePricingFeatures(entity string, features []string) error { + if !pred.IsMeeshoEnabled { + return nil + } response, err := externalcall.PricingClient.GetDataTypes(entity) if err != nil { return fmt.Errorf("failed to call pricing service API: %w", err) diff --git a/horizon/internal/predator/init.go b/horizon/internal/predator/init.go index e73b9b9c..587fda28 100644 --- a/horizon/internal/predator/init.go +++ b/horizon/internal/predator/init.go @@ -14,6 +14,7 @@ var ( TestDeployableID int TestGpuDeployableID int initOnce sync.Once + IsMeeshoEnabled bool ) func Init(config configs.Configs) { @@ -24,6 +25,7 @@ func Init(config configs.Configs) { GcsModelBasePath = config.GcsModelBasePath TestDeployableID = config.TestDeployableID TestGpuDeployableID = config.TestGpuDeployableID + IsMeeshoEnabled = config.IsMeeshoEnabled }) } diff --git a/horizon/internal/repositories/sql/inferflow/models.go b/horizon/internal/repositories/sql/inferflow/models.go index 108308f1..68ecce49 100644 --- a/horizon/internal/repositories/sql/inferflow/models.go +++ b/horizon/internal/repositories/sql/inferflow/models.go @@ -33,6 +33,12 @@ type PredatorOutput struct { DataType string `json:"data_type"` } +type RoutingConfig struct { + ModelName string `json:"model_name"` + ModelEndpoint string `json:"model_endpoint"` + RoutingPercentage float32 `json:"routing_percentage"` +} + type PredatorComponent struct { Component string `json:"component"` ComponentID string `json:"component_id"` @@ -43,6 +49,7 @@ type PredatorComponent struct { BatchSize int `json:"batch_size"` Inputs []PredatorInput `json:"inputs"` Outputs []PredatorOutput `json:"outputs"` + RoutingConfig []RoutingConfig `json:"route_config,omitempty"` } type ResponseConfig struct { @@ -97,7 +104,7 @@ type ComponentConfig struct { CacheTTL int `json:"cache_ttl"` CacheVersion int `json:"cache_version"` FeatureComponents []FeatureComponent `json:"feature_components"` - RTPComponents []RTPComponent `json:"real_time_pricing_feature_components"` + RTPComponents []RTPComponent `json:"real_time_pricing_feature_components,omitempty"` PredatorComponents []PredatorComponent `json:"predator_components"` NumerixComponents []NumerixComponent `json:"numerix_components"` } @@ -126,12 +133,15 @@ type OnboardPayload struct { } type OnboardRanker struct { - ModelName string `json:"model_name"` - Calibration string `json:"calibration"` - EndPoint string `json:"end_point"` - Inputs []PredatorInput `json:"inputs"` - Outputs []PredatorOutput `json:"outputs"` - EntityID []string `json:"entity_id"` + ModelName string `json:"model_name"` + BatchSize int `json:"batch_size"` + Deadline int `json:"deadline"` + Calibration string `json:"calibration"` + EndPoint string `json:"end_point"` + Inputs []PredatorInput `json:"inputs"` + Outputs []PredatorOutput `json:"outputs"` + EntityID []string `json:"entity_id"` + RoutingConfig []RoutingConfig `json:"route_config,omitempty"` } type OnboardReRanker struct { diff --git a/horizon/internal/repositories/sql/inferflow/request/repository.go b/horizon/internal/repositories/sql/inferflow/request/repository.go index eeed1c5e..dbe5d34d 100644 --- a/horizon/internal/repositories/sql/inferflow/request/repository.go +++ b/horizon/internal/repositories/sql/inferflow/request/repository.go @@ -21,6 +21,8 @@ type Repository interface { DoesConfigIdExistWithRequestType(configID string, requestType string) (bool, error) CurrentRequestStatus(requestID uint) (string, error) Deactivate(configID string) error + GetLatestPendingRequestByConfigID(configID string) ([]Table, error) + GetApprovedRequestsByConfigID(configID string) ([]Table, error) } type InferflowRequest struct { @@ -126,3 +128,19 @@ func (g *InferflowRequest) DoesRequestIDExistWithStatus(requestID uint, status s func (g *InferflowRequest) Deactivate(configID string) error { return g.db.Model(&Table{}).Where("config_id = ?", configID).Update("active", false).Error } + +func (g *InferflowRequest) GetApprovedRequestsByConfigID(configID string) ([]Table, error) { + var tables []Table + result := g.db.Where("config_id = ? AND status = ?", configID, "APPROVED"). + Order("created_at DESC"). + Find(&tables) + return tables, result.Error +} + +func (g *InferflowRequest) GetLatestPendingRequestByConfigID(configID string) ([]Table, error) { + var tables []Table + result := g.db.Where("config_id = ? AND status NOT IN (?)", configID, []string{"APPROVED", "REJECTED"}). + Order("created_at DESC"). + Find(&tables) + return tables, result.Error +} diff --git a/horizon/pkg/infra/init.go b/horizon/pkg/infra/init.go index 7d95d285..53d1799c 100644 --- a/horizon/pkg/infra/init.go +++ b/horizon/pkg/infra/init.go @@ -1,19 +1,23 @@ package infra -import "sync" +import ( + "sync" + + "github.com/Meesho/BharatMLStack/horizon/internal/configs" +) var ( mut sync.Mutex ConfIdDBTypeMap = make(map[int]DBType) ) -func InitDBConnectors() { +func InitDBConnectors(config configs.Configs) { mut.Lock() defer mut.Unlock() if Scylla == nil { - initScyllaClusterConns() + initScyllaClusterConns(config) } if SQL == nil { - initSQLConns() + initSQLConns(config) } } diff --git a/horizon/pkg/infra/scylla.go b/horizon/pkg/infra/scylla.go index db0c0225..3152fa33 100644 --- a/horizon/pkg/infra/scylla.go +++ b/horizon/pkg/infra/scylla.go @@ -5,9 +5,9 @@ import ( "strconv" "strings" + "github.com/Meesho/BharatMLStack/horizon/internal/configs" "github.com/gocql/gocql" "github.com/rs/zerolog/log" - "github.com/spf13/viper" ) var ( @@ -49,8 +49,8 @@ func (s *ScyllaConnectors) GetConnection(configId int) (ConnectionFacade, error) return conn, nil } -func initScyllaClusterConns() { - activeConfIdsStr := viper.GetString(storageScyllaPrefix + activeConfIds) +func initScyllaClusterConns(config configs.Configs) { + activeConfIdsStr := config.ScyllaActiveConfIds if activeConfIdsStr == "" { return } diff --git a/horizon/pkg/infra/sql.go b/horizon/pkg/infra/sql.go index 3054b5ea..74ace265 100644 --- a/horizon/pkg/infra/sql.go +++ b/horizon/pkg/infra/sql.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/Meesho/BharatMLStack/horizon/internal/configs" "github.com/rs/zerolog/log" "gorm.io/driver/mysql" "gorm.io/gorm" @@ -66,11 +67,11 @@ func (c *SQLConnection) IsLive() bool { return c.Master != nil && c.Slave != nil } -// initSQLConns initializes all SQL connections based on environment configuration -func initSQLConns() { +// initSQLConns initializes all SQL connections based on application configuration +func initSQLConns(config configs.Configs) { // Create master and slave connections from config - masterConfig, slaveConfig, err := BuildSQLConfigFromEnv() + masterConfig, slaveConfig, err := BuildSQLConfigFromConfig(config) if err != nil { log.Panic().Msg(err.Error()) } diff --git a/horizon/pkg/infra/sql_conf_builder.go b/horizon/pkg/infra/sql_conf_builder.go index ca590068..60ac0120 100644 --- a/horizon/pkg/infra/sql_conf_builder.go +++ b/horizon/pkg/infra/sql_conf_builder.go @@ -3,27 +3,7 @@ package infra import ( "errors" - "github.com/spf13/viper" -) - -// Mandatory config keys -// = -// = -// = -// = -// = -const ( - masterHost = "MYSQL_MASTER_HOST" - masterPort = "MYSQL_MASTER_PORT" - masterDBName = "MYSQL_DB_NAME" - masterUsername = "MYSQL_MASTER_USERNAME" - masterPassword = "MYSQL_MASTER_PASSWORD" - - slaveHost = "MYSQL_SLAVE_HOST" - slavePort = "MYSQL_SLAVE_PORT" - slaveDBName = "MYSQL_DB_NAME" - slaveUsername = "MYSQL_SLAVE_USERNAME" - slavePassword = "MYSQL_SLAVE_PASSWORD" + "github.com/Meesho/BharatMLStack/horizon/internal/configs" ) // SQLConfig represents the configuration for a SQL database connection @@ -35,70 +15,69 @@ type SQLConfig struct { Password string } -// BuildSQLConfigFromEnv constructs a SQL configuration for master and slave -// from environment variables using the specified prefix. +// BuildSQLConfigFromConfig constructs a SQL configuration for master and slave +// from the application config struct. // -// The function leverages Viper to read environment variables, ensuring -// a flexible and configurable setup. It extracts key parameters required -// for configuring MySQL connections. +// The function extracts key parameters required for configuring MySQL connections +// from the provided config struct. // -// Mandatory environment variables: -// - MYSQL_MASTER_HOST: Master host -// - MYSQL_MASTER_PORT: Master port -// - MYSQL_DB_NAME: Database name -// - MYSQL_MASTER_USERNAME: Master username -// - MYSQL_MASTER_PASSWORD: Master password +// Mandatory config fields: +// - MysqlMasterHost: Master host +// - MysqlMasterPort: Master port +// - MysqlDbName: Database name +// - MysqlMasterUsername: Master username +// - MysqlMasterPassword: Master password // -// Optional environment variables for slave: -// - MYSQL_SLAVE_HOST: Slave host -// - MYSQL_SLAVE_PORT: Slave port -// - MYSQL_SLAVE_USERNAME: Slave username -// - MYSQL_SLAVE_PASSWORD: Slave password +// Optional config fields for slave: +// - MysqlSlaveHost: Slave host +// - MysqlSlavePort: Slave port +// - MysqlSlaveUsername: Slave username +// - MysqlSlavePassword: Slave password // // Returns: -// - Master and slave SQLConfig instances and an error if mandatory variables are missing -func BuildSQLConfigFromEnv() (master SQLConfig, slave SQLConfig, err error) { +// - Master and slave SQLConfig instances and an error if mandatory fields are missing +func BuildSQLConfigFromConfig(config configs.Configs) (master SQLConfig, slave SQLConfig, err error) { // Check required master configuration - if !viper.IsSet(masterHost) { - return master, slave, errors.New(masterHost + " not set") + if config.MysqlMasterHost == "" { + return master, slave, errors.New("MysqlMasterHost not set") } - if !viper.IsSet(masterPort) { - return master, slave, errors.New(masterPort + " not set") + if config.MysqlMasterPort == 0 { + return master, slave, errors.New("MysqlMasterPort not set") } - if !viper.IsSet(masterDBName) { - return master, slave, errors.New(masterDBName + " not set") + if config.MysqlDbName == "" { + return master, slave, errors.New("MysqlDbName not set") } - if !viper.IsSet(masterUsername) { - return master, slave, errors.New(masterUsername + " not set") + if config.MysqlMasterUsername == "" { + return master, slave, errors.New("MysqlMasterUsername not set") } // Set master configuration master = SQLConfig{ - Host: viper.GetString(masterHost), - Port: viper.GetInt(masterPort), - DBName: viper.GetString(masterDBName), - Username: viper.GetString(masterUsername), - Password: viper.GetString(masterPassword), + Host: config.MysqlMasterHost, + Port: config.MysqlMasterPort, + DBName: config.MysqlDbName, + Username: config.MysqlMasterUsername, + Password: config.MysqlMasterPassword, } // Check if slave configuration is provided - if viper.IsSet(slaveHost) && - viper.IsSet(slaveUsername) && - viper.IsSet(slavePassword) { + if config.MysqlSlaveHost != "" && + config.MysqlSlaveUsername != "" && + config.MysqlSlavePassword != "" { // If slave port is not set, use master port - slavePortValue := viper.GetInt(masterPort) - if viper.IsSet(slavePort) { - slavePortValue = viper.GetInt(slavePort) + slavePortValue := config.MysqlMasterPort + if config.MysqlSlavePort != 0 { + slavePortValue = config.MysqlSlavePort } // Set slave configuration slave = SQLConfig{ - Host: viper.GetString(slaveHost), + Host: config.MysqlSlaveHost, Port: slavePortValue, - DBName: viper.GetString(masterDBName), // Use master DB name by default - Username: viper.GetString(slaveUsername), - Password: viper.GetString(slavePassword), + DBName: config.MysqlDbName, // Use master DB name by default + Username: config.MysqlSlaveUsername, + Password: config.MysqlSlavePassword, } } diff --git a/horizon/pkg/logger/logger.go b/horizon/pkg/logger/logger.go index 534c4124..74ad16cc 100644 --- a/horizon/pkg/logger/logger.go +++ b/horizon/pkg/logger/logger.go @@ -2,14 +2,15 @@ package logger import ( "fmt" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - "github.com/spf13/viper" "os" "runtime/debug" "strconv" "strings" "sync" + + "github.com/Meesho/BharatMLStack/horizon/internal/configs" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) var ( @@ -18,14 +19,14 @@ var ( appName = "" ) -// Init initializes the logger by fetching the log level and app name from the viper configuration -func Init() { +// Init initializes the logger by fetching the log level and app name from the app configuration +func Init(config configs.Configs) { - appName = viper.GetString("APP_NAME") - logLevel := viper.GetString("APP_LOG_LEVEL") + appName = config.AppName + logLevel := config.AppLogLevel if len(appName) == 0 { - panic("APP_NAME is not set!") + panic("App name is not set!") } if len(logLevel) == 0 { log.Warn().Msg("Log level not set, defaulting to WARN") diff --git a/horizon/pkg/metric/metric.go b/horizon/pkg/metric/metric.go index e52a666a..c988b026 100644 --- a/horizon/pkg/metric/metric.go +++ b/horizon/pkg/metric/metric.go @@ -5,8 +5,8 @@ import ( "time" "github.com/DataDog/datadog-go/v5/statsd" + "github.com/Meesho/BharatMLStack/horizon/internal/configs" "github.com/rs/zerolog/log" - "github.com/spf13/viper" ) const ( @@ -32,16 +32,16 @@ var ( ) // Init initializes the metrics client -func Init() { +func Init(config configs.Configs) { if initialized { log.Debug().Msgf("Metrics already initialized!") return } once.Do(func() { var err error - samplingRate = viper.GetFloat64("APP_METRIC_SAMPLING_RATE") - appName = viper.GetString("APP_NAME") - globalTags := getGlobalTags() + samplingRate = config.AppMetricSamplingRate + appName = config.AppName + globalTags := getGlobalTags(config) statsDClient, err = statsd.New( telegrafAddress, @@ -62,12 +62,12 @@ func getDefaultClient() *statsd.Client { return client } -func getGlobalTags() []string { - env := viper.GetString("APP_ENV") +func getGlobalTags(config configs.Configs) []string { + env := config.AppEnv if len(env) == 0 { log.Warn().Msg("APP_ENV is not set") } - service := viper.GetString("APP_NAME") + service := config.AppName if len(service) == 0 { log.Warn().Msg("APP_NAME is not set") } diff --git a/horizon/pkg/zookeeper/model.go b/horizon/pkg/zookeeper/model.go deleted file mode 100644 index 4e1c5b3c..00000000 --- a/horizon/pkg/zookeeper/model.go +++ /dev/null @@ -1,24 +0,0 @@ -package zookeeper - -import ( - "sync" - - "github.com/go-zookeeper/zk" -) - -type ZKConfig struct { - Server string `koanf:"server"` - Watcher string `koanf:"watcher"` -} - -type Node struct { - Path string `json:"path"` - Data interface{} `json:"data"` - Children map[string]*Node `json:"children"` -} - -type TreeCache struct { - conn *zk.Conn `json:"-"` - TreeLock sync.RWMutex `json:"-"` - RootNode *Node `json:"rootNode"` -} diff --git a/horizon/pkg/zookeeper/zookeeper.go b/horizon/pkg/zookeeper/zookeeper.go deleted file mode 100644 index 58af03f7..00000000 --- a/horizon/pkg/zookeeper/zookeeper.go +++ /dev/null @@ -1,259 +0,0 @@ -package zookeeper - -import ( - "encoding/json" - "fmt" - "strings" - "sync" - "time" - - "github.com/Meesho/BharatMLStack/horizon/internal/configs" - - "github.com/go-zookeeper/zk" - "github.com/rs/zerolog/log" -) - -var ( - conn *zk.Conn - WatcherEnabled string - ZKChannel chan []byte - zkServers string - zkWatcher string - nodePath string - initZKConfigsOnce sync.Once -) - -func InitZKConnection(config configs.Configs) { - initZKConfigsOnce.Do(func() { - zkServers = config.ZookeeperServer - zkWatcher = config.ZookeeperWatcher - nodePath = config.ZookeeperBasePath - }) - var zkConfig ZKConfig - if len(zkWatcher) == 0 || len(zkServers) == 0 { - log.Panic().Msg("Zookeeper server or watcher is not set") - } - zkConfig.Server = zkServers - zkConfig.Watcher = zkWatcher - - servers := strings.Split(zkConfig.Server, ",") - WatcherEnabled = zkConfig.Watcher - log.Info().Msgf("Is Watcher Enabled: %s", WatcherEnabled) - var err error - conn, _, err = zk.Connect(servers, time.Second*5) - if err != nil { - log.Panic().Err(err).Msg("Unable to connect to zk server ") - } - ZKChannel = make(chan []byte, 10) -} - -func WatchZkNode() { - // Create a new TreeCache instance - treeCache := NewTreeCache(conn, nodePath) - WatchAllNodes(nodePath, treeCache, treeCache.RootNode, treeCache.RootNode.Children) - mapDataToStruct(treeCache) -} - -func mapDataToStruct(treeCache *TreeCache) { - jsonData := traverseNodes(treeCache.RootNode, "") - // Convert the JSON data to a byte array - jsonBytes, err := json.MarshalIndent(jsonData, "", " ") - if err != nil { - log.Error().Err(err).Msg("Error creating JSON:") - } - log.Info().Msgf("JSON : %s", string(jsonBytes)) - ZKChannel <- jsonBytes -} - -func traverseNodes(node *Node, prefix string) interface{} { - currentPath := fmt.Sprintf("%s", node.Path) - - // Check if the node has any children - if len(node.Children) > 0 { - // Create a map to hold the JSON data for the current node - jsonData := make(map[string]interface{}) - - // Traverse the children nodes recursively - for key, child := range node.Children { - // Create the nested JSON structure for the child node - childJSON := traverseNodes(child, fmt.Sprintf("%s%s/", currentPath, key)) - - // Assign the child JSON to the corresponding key in the current node's JSON data - jsonData[key] = childJSON - } - - return jsonData - } - - // If the node has no children, assign the node data directly to the current path - return node.Data -} - -func Get(nodePath string) ([]byte, *zk.Stat, error) { - - data, stat, err := conn.Get(nodePath) - if err != nil { - log.Error().Err(err).Msgf("Error getting config from zk path %s ", nodePath) - return nil, nil, err - } - return data, stat, err -} - -func GetChildern(zkPath string) (map[string]string, *zk.Stat, error) { - - children, _, err := conn.Children(zkPath) - if err != nil { - log.Error().Err(err).Msgf("Error getting config from zk path %s ", zkPath) - return nil, nil, err - } - - stringMap := make(map[string]string) - // Iterate over the child nodes and fetch data for each one - for _, child := range children { - nodePath := strings.Join([]string{zkPath, child}, "/") - data, _, err := Get(nodePath) - if err != nil { - return nil, nil, err - } else { - stringMap[child] = string(data) - } - } - return stringMap, nil, nil -} - -func NewTreeCache(conn *zk.Conn, rootPath string) *TreeCache { - return &TreeCache{ - conn: conn, - RootNode: &Node{ - Path: rootPath, - Data: nil, - Children: make(map[string]*Node), - }, - } -} - -func WatchAllNodes(nodePath string, tc *TreeCache, node *Node, rootNodeChildren map[string]*Node) { - // GetApplication the data and set a watch on the node - data, _, watchCh, err := conn.GetW(nodePath) - if err != nil { - log.Error().Err(err).Msg("Error getting data from zk path") - return - } - - // Process the node data or trigger an action based on your requirements - log.Info().Msgf("Node path: %s, Data: %s\n", nodePath, data) - - tc.TreeLock.Lock() - nodePathSplit := strings.Split(nodePath, "/") - node.Path = nodePathSplit[len(nodePathSplit)-1] - node.Data = string(data) - // tc.nodeMap[nodePath] = node - tc.TreeLock.Unlock() - - // Retrieve the child nodes - children, _, childCh, err := conn.ChildrenW(nodePath) - if err != nil { - log.Error().Err(err).Msg("Error getting children from zk path") - return - } - - if WatcherEnabled == "true" { - // Start a goroutine to handle watch events for the current node - go func() { - for { - select { - case event := <-watchCh: - if event.Type == zk.EventNodeDataChanged { - // Node data changed, handle it based on your requirements - log.Info().Msgf("Node data changed: %s\n", event.Path) - - // GetApplication the updated data and trigger an action - data, _, err := conn.Get(event.Path) - if err != nil { - log.Error().Err(err).Msg("Error getting data from zk path") - continue - } - - log.Info().Msgf("Updated data for node %s: %s\n", event.Path, data) - node.Data = string(data) - mapDataToStruct(tc) - // Recreate the watch - _, _, watchCh, err = conn.GetW(event.Path) - if err != nil { - log.Error().Err(err).Msg("Error getting data from zk path") - continue - } - } - // else if event.Type == zk.EventNodeDeleted { - // // Node deleted, handle it based on your requirements - // gozookeeperlogger.Info(fmt.Sprintf("Node deleted: %s\n", event.Path)) - // splitPath := strings.Split(event.Path, "/") - // delete(node.Children, splitPath[len(splitPath)-1]) - // // Remove the watch for the deleted node - // conn.ExistsW(event.Path) - // // mapDataToStruct(tc) - // } - // TODO: Delete is not working as we have to maintain a parent maping as well to delete the node from parent list - // This is not burning problem, Will fix this later - - case childEvent := <-childCh: - if childEvent.Type == zk.EventNodeChildrenChanged { - // Child nodes changed, handle it based on your requirements - log.Info().Msgf("Child nodes changed: %s\n", childEvent.Path) - - // Retrieve the updated list of child nodes and the new watch channel - children, _, _, err := conn.ChildrenW(childEvent.Path) - if err != nil { - log.Error().Err(err).Msg("Error getting children from zk path") - continue - } - - log.Info().Msgf("Updated child nodes for %s:\n", childEvent.Path) - for _, child := range children { - fmt.Println(child) - } - - // Process the updated child nodes - for _, child := range children { - childPath := childEvent.Path + "/" + child - childNode := &Node{ - Path: child, - Data: string(data), - Children: make(map[string]*Node), - } - // checking if the child Node is already present, Then we will skip the Node and Move forward - _, ok := node.Children[child] - if ok { - continue - } - node.Children[child] = childNode - // Recursively watch child nodes - WatchAllNodes(childPath, tc, childNode, childNode.Children) - } - mapDataToStruct(tc) - // Recreate the watch - _, _, watchCh, err = conn.GetW(childEvent.Path) - if err != nil { - log.Error().Err(err).Msg("Error getting data from zk path") - continue - } - } - } - - } - }() - } - // Process the child nodes - for _, child := range children { - childPath := nodePath + "/" + child - - childNode := &Node{ - Path: child, - Data: string(data), - Children: make(map[string]*Node), - } - node.Children[child] = childNode - // Recursively watch child nodes - WatchAllNodes(childPath, tc, childNode, childNode.Children) - } -}