#!/bin/bash
set -euo pipefail

# ==============================================
# 调试模式开关
# ==============================================
if [[ "${1:-}" == "--debug" ]]; then
    set -x
    shift
fi

# ==============================================
# 配置区域 - 所有环境相关参数都在这里修改
# ==============================================
APP_NAME="process"
APP_HOME="/home/mango/process"
PROPERTIES_FILE="${APP_HOME}/application.properties"
HOSTS_FILE="/etc/hosts"

# 主机名前缀配置 - 匹配/etc/hosts中以此开头的所有大数据节点
HOST_PREFIX="bigdata"

# Flink运行参数配置
FLINK_PARALLELISM=6
FLINK_JM_MEMORY="1024M"
FLINK_TM_MEMORY="2048mb"
FLINK_MAIN_CLASS="com.hc.preprocess.app.ProcessAPP"

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# ==============================================
# 工具函数
# ==============================================
info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

error() {
    echo -e "${RED}[ERROR]${NC} $1"
    exit 1
}

# ==============================================
# 参数检查与初始化
# ==============================================
if [ $# -lt 1 ]; then
    error "用法: $0 [--debug] <jar包名> [启动时间(YYYY-MM-DD HH:MM:SS)]"
    echo "示例1: $0 process-3.0.2.0001.jar 2026-05-14 11:00:02"
    echo "示例2: $0 --debug process-3.0.2.0001.jar"
    exit 1
fi

PKG="$1"
JAR_PATH="${APP_HOME}/${PKG}"

# 检查jar包是否存在
if [ ! -f "$JAR_PATH" ]; then
    error "Jar包不存在: $JAR_PATH"
fi

# 检查配置文件是否存在
if [ ! -f "$PROPERTIES_FILE" ]; then
    error "配置文件不存在: $PROPERTIES_FILE"
fi

# ==============================================
# 时间戳处理
# ==============================================
if [ $# -ge 2 ]; then
    # 接收除第一个之外的所有参数作为时间（支持带空格的时间格式）
    TIME="${*:2}"
    info "正在转换时间: $TIME"
    
    # 转换为毫秒级时间戳（使用date命令的%3N格式）
    if ! NTIME=$(date -d"$TIME" +%s%3N 2>/dev/null); then
        error "时间格式错误，请使用: YYYY-MM-DD HH:MM:SS"
    fi
    
    # 修改配置文件中的时间戳（精确匹配行首，避免误修改）
    sed -i "s@^timestamp=.*@timestamp=$NTIME@" "$PROPERTIES_FILE"
    
    # 转换回可读格式验证（使用算术扩展，比字符串截取更安全）
    READABLE_TIME=$(date -d@$((NTIME / 1000)) +'%F %T')
    success "预处理时间已修改为: $READABLE_TIME"
else
    # 从配置文件读取当前时间戳
    if ! NTIME=$(awk -F= '/^timestamp=/{print $2}' "$PROPERTIES_FILE"); then
        error "无法从配置文件读取timestamp"
    fi
    
    if [ -z "$NTIME" ]; then
        error "配置文件中未找到timestamp配置"
    fi
    
    READABLE_TIME=$(date -d@$((NTIME / 1000)) +'%F %T')
    info "当前预处理时间: $READABLE_TIME"
fi

# ==============================================
# 交互式确认
# ==============================================
echo ""
warning "即将启动Flink预处理任务"
info "应用名称: $APP_NAME"
info "Jar包: $PKG"
info "预处理时间: $READABLE_TIME"
info "并行度: $FLINK_PARALLELISM"
info "JobManager内存: $FLINK_JM_MEMORY"
info "TaskManager内存: $FLINK_TM_MEMORY"
echo ""

read -p "是否确认启动？(y/N): " -n 1 -r
echo ""
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
    info "操作已取消"
    exit 0
fi

# ==============================================
# 清理旧任务
# ==============================================
echo ""
info "正在检查并清理旧的Yarn任务..."

# 安全提取所有名为process的任务ID（正则匹配，不受日志干扰）
APP_IDS=$(yarn application -list 2>/dev/null | grep -oE "application_[0-9]+_[0-9]+.*${APP_NAME}" | awk '{print $1}' || true)

if [ -n "$APP_IDS" ]; then
    TASK_COUNT=$(echo "$APP_IDS" | wc -l)
    warning "发现 $TASK_COUNT 个旧任务，正在清理..."
    
    for APP_ID in $APP_IDS; do
        info "正在杀死任务: $APP_ID"
        if yarn application -kill "$APP_ID" &>/dev/null; then
            success "任务 $APP_ID 已杀死"
        else
            warning "任务 $APP_ID 杀死失败（可能已完成）"
        fi
    done
    
    # 等待任务完全终止（避免端口冲突和资源占用）
    info "等待5秒确保任务完全终止..."
    sleep 5
else
    success "没有发现旧的Yarn任务"
fi

# ==============================================
# 同步配置文件到所有节点
# ==============================================
echo ""
info "正在同步配置文件到所有${HOST_PREFIX}节点..."

# 获取除本机外的所有匹配主机名前缀的节点（自动去重）
BIGDATA_NODES=$(awk "/${HOST_PREFIX}/{print \$1}" "$HOSTS_FILE" | grep -v "$(hostname -i)" | sort -u || true)

if [ -z "$BIGDATA_NODES" ]; then
    warning "未发现其他${HOST_PREFIX}节点，跳过配置同步"
else
    NODE_COUNT=$(echo "$BIGDATA_NODES" | wc -l)
    info "发现 $NODE_COUNT 个节点需要同步"
    
    SUCCESS_COUNT=0
    FAIL_COUNT=0
    
    for NODE in $BIGDATA_NODES; do
        info "正在同步到 $NODE ..."
        if scp -o ConnectTimeout=5 -o StrictHostKeyChecking=no -r "$PROPERTIES_FILE" "${NODE}:${APP_HOME}/" &>/dev/null; then
            success "节点 $NODE 同步成功"
            ((++SUCCESS_COUNT))
        else
            warning "节点 $NODE 同步失败！请检查网络连接和SSH免密登录"
            ((++FAIL_COUNT))
        fi
    done
    
    echo ""
    if [ $FAIL_COUNT -eq 0 ]; then
        success "所有${HOST_PREFIX}节点配置同步完成"
    else
        warning "配置同步部分失败，成功: $SUCCESS_COUNT，失败: $FAIL_COUNT"
        warning "将继续启动Flink任务，请手动检查失败节点"
    fi
fi

# ==============================================
# 启动Flink应用
# ==============================================
echo ""
info "正在启动Flink应用..."

# 启动应用
flink run-application \
    -t yarn-application \
    -p "$FLINK_PARALLELISM" \
    -yjm "$FLINK_JM_MEMORY" \
    -Dtaskmanager.memory.process.size="$FLINK_TM_MEMORY" \
    -Dyarn.application.name="$APP_NAME" \
	-Dfs.hdfs.hadoopconf=/home/hadoop/hadoop-2.7.7/etc/hadoop \
	-Dfs.hdfs.impl.disable.cache=true \
	-Dyarn.am.failover.enabled=true \
	-Dyarn.am.failover.max-attempts=5 \
	-Dyarn.application-attempts=5 \
	-Drestart-strategy=fixed-delay \
	-Drestart-strategy.fixed-delay.attempts=5 \
	-Drestart-strategy.fixed-delay.delay=5s \
    -c "$FLINK_MAIN_CLASS" \
    "$JAR_PATH" \
    "$PROPERTIES_FILE" || true

# 判断退出码
if [ $? -eq 0 ]; then
    echo ""
    success "=============================================="
    success "Flink应用启动成功！"
    success "应用名称: $APP_NAME"
    success "Jar包: $PKG"
    success "预处理时间: $READABLE_TIME"
    success "并行度: $FLINK_PARALLELISM"
    success "JobManager内存: $FLINK_JM_MEMORY"
    success "TaskManager内存: $FLINK_TM_MEMORY"
    success "同步节点数: ${NODE_COUNT:-0}"
    success "=============================================="

    echo ""
    echo ""
    yarn application -list | awk 'NR>1 {print}'
else
    error "Flink应用启动失败！"
fi
