加入收藏 | 设为首页 | 会员中心 | 我要投稿 聊城站长网 (https://www.0635zz.com/)- 智能语音交互、行业智能、AI应用、云计算、5G!
当前位置: 首页 > 服务器 > 系统 > 正文

MongoDB的分片和卸除脚本的示例

发布时间:2023-10-09 15:13:49 所属栏目:系统 来源:
导读:脚本功能:

1. 将指定的报告文件按照指定的字段、切库切表策略切分

2. 将切分后的文件并发导入到对应的Mongodb中

3. 生成日志文件和done标识文件

使用手册:

-h 打印
脚本功能:
 
      1. 将指定的报告文件按照指定的字段、切库切表策略切分
 
      2. 将切分后的文件并发导入到对应的Mongodb中
 
      3. 生成日志文件和done标识文件
 
使用手册:
 
     -h    打印帮助信息,并退出";
 
         -f     需要切分的数据文件";
 
         -g    清理昨日或历史全部数据: 1 昨日数据  2 历史全部数据";
 
         -k     拆分字段在文件中列数,从1开始";
 
         -o    需要切分的数据文件格式 tsv或csv ";
 
         -d    切分的库数目";
 
         -t     切分的表数目";
 
         -m   切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";
 
         -c    切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";
 
         -a    入库fieldFile";
 
         -p    配置文件",
 
使用步骤:
 
     1. 在配置文件中设置日志、切割后数据临时路径$LOG_HOME 和 $DATA_SPLIT_HOME目录,如果不存在,则手动创建;
 
         在配置文件中设置目标Mongodb参数信息,用来作为导入数据的目标库;
 
         在配置文件中设置Mongodb程序的主目录$MONGO;
 
     2. 按照具体的参数意义,仿照下面的格式执行脚本:
 
          举例:./mongo-split-importer.sh -f /data/shell/test.ata -g 1 -o tsv -k 3 -d 3 -t 3 -m idea -c idea -p ../conf/demeter_conf_qa.sh -a ../conf/idea-head-file
 
          -f 切分目标文件   -o 文件格式 tsv    -k 切割字段,第三个  -d 切割成3个库 -t 每个库3个表
 
          -m 导入的mongodb未拆分名称idea -c 导入的mongodb未拆分表名idea -p 环境配置文件 -a 导入目标表的fieldFile文件 -g 清理昨日数据
 
mongo-split-importer.sh执行脚本:
 
#!/bin/bash
 
SPLITFILE="" #目标切割文件
 
FILEFORMAT="" # 目标切割文件格式 , \t
 
FILEFORMATNAME="" #切割目标文件格式名称 csv tsv
 
SPLITKEY=1
 
SPLITDBNUM="" #目标切割库数目
 
SPLITTBNUM="" #目标切割表数目
 
IMPORTDBNAME="" # 目标入库未分割库名
 
IMPORTTBNAME="" #目标入库未切割表名
 
PROFILE="" #配置文件
 
FIELDFILE="" #入库fieldFile
 
CLEAN=0  #清理数据, 0:默认不清理, 1 : 清理昨日的数据    2: 清理所有以前的数据
 
SPILTTMPDIR="" #目标切割文件存放临时目录
 
FULLPATH=$(cd `dirname $0`;pwd -P)
 
SCRIPTFILE=`basename $0`
 
TOTLE_RECORD_NUM=0 #文件切割前的记录条目
 
SUBFILE_RECORD_NUM=0 #切割后所有文件汇总的记录条目
 
_mongo_count="-1"
 
#------------------------------------------------函数---------------------------------------------------------------
 
function usage(){
 
        echo "$SCRIPTFILE - 分库分表后将数据导数据到mongodb"
 
        echo "SYNOPSIS"
 
        echo "OPTIONS"
 
        echo "  -h    打印帮助信息,并退出";
 
        echo "  -f     需要切分的数据文件";
 
        echo "  -g    是否清理历史数据,默认不清理   1:清理昨日数据  2:清理以前所有数据";
 
        echo "  -k     拆分字段在文件中列数,从1开始";
 
        echo "  -o    需要切分的数据文件格式 tsv或csv ";
 
        echo "  -d    切分的库数目";
 
        echo "  -t     切分的表数目";
 
        echo "  -m   切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";
 
        echo "  -c    切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";
 
        echo "  -a    入库fieldFile";
 
        echo "  -p    配置文件,绝对或相对路径文件",
 
        exit
 
}
 
function setFileFormat(){
 
        FILEFORMATNAME=$1
 
        case $1
 
        in
 
                csv)  FILEFORMAT=",";;
 
                tsv)   FILEFORMAT="\t";;
 
                *) echo "unknow profile -o $1"; usage;;
 
        esac
 
}
 
while getopts ':hf:g:o:k:d:t:a:p:m:c:' OPTION
 
do
 
        case $OPTION
 
        in
 
                h) usage;;
 
                f) SPLITFILE=$OPTARG;;
 
                g)CLEAN=$OPTARG;;
 
                o) setFileFormat $OPTARG;;
 
                k) SPLITKEY=$OPTARG;;
 
                d) SPLITDBNUM=$OPTARG;;
 
                t) SPLITTBNUM=$OPTARG;;
 
                a) FIELDFILE=$OPTARG;;
 
                p) PROFILE=$OPTARG;;
 
                m) IMPORTDBNAME=$OPTARG;;
 
                c) IMPORTTBNAME=$OPTARG;;
 
                :) echo "选项 \"-$OPTARG\" 后面缺少对应值, 将使用默认值";;
 
                \?)echo " 错误的选项 -$OPTARG, 将退出"; usage;;
 
        esac
 
done
 
#记录日志信息
 
function logInfo(){
 
  echo "[`date +"%Y-%m-%d %H:%M:%S"`] $@ " | tee -a $LOGFILE
 
}
 
function checkError(){
 
        if [ $? -ne 0 ]; then
 
                echo "[`date +"%Y-%m-%d %H:%M:%S,%s"`][$SCRIPTFILE, $$] ERROR OCCURS! - $1" | tee -a $ERRORFILE
 
                exit 1;
 
        fi
 
}
 
function check_ready() {
 
        tmp_done_file=`printf "$reportDoneFile" "$TABLE" "$1"`
 
        while [ "$isok" = "false" ]; do
 
                rsync  --list-only ${tmp_done_file}
 
                if [ $? -eq 0 ]; then
 
                        isok="true";
 
                        break;
 
                fi
 
                if [ "$isok" = "false" ]; then
 
                        sleep 300
 
                fi
 
                time_now=`date  +%s`
 
                if [ `expr ${time_now} - ${time_start}` -ge $max_interval ]; then
 
                        return 255;
 
                fi
 
        done
 
        return 0;
 
}
 
#从数据库列表里选择主库
 
function selectMongoMaster(){
 
                tmp="TARGET_MONGO_HOST_LIST_0$1"
 
                TMP_HOST=${!tmp}
 
                echo $TMP_HOST
 
                #replica set
 
                for DUBHE_MONGO_HOST in $TMP_HOST; do
 
                        if [ $? -eq 0 ] ; then
 
                                break;
 
                        fi
 
                done
 
                # single server
 
                #for DUBHE_MONGO_HOST in $TMP_HOST; do
 
                        #TARGET_MONGO_HOST=$DUBHE_MONGO_HOST
 
                        #echo $TARGET_MONGO_HOST
 
                #done
 
}
 
#切割
 
function split() {
 
        logInfo "spilt data file"
 
        echo "split db num"$SPLITDBNUM
 
        echo "split tb num"$SPLITTBNUM
 
        echo "Start to split file: "$SPLITFILE
 
        awk '
 
        BEGIN {
 
                FS="'${FILEFORMAT}'";
 
        }
 
        ARGIND==1{
 
                        #分库分表
 
                DBN=$'${SPLITKEY}' % '${SPLITDBNUM}' + 1;
 
                TBN=int($'${SPLITKEY}' / '${SPLITDBNUM}')
 
                        TBN=TBN % '${SPLITTBNUM}' + 1;
 
                DBN="0"DBN;
 
                TBN="0"TBN;
 
                        print $0 > "'${SPILTTMPDIR}'""/""'${IMPORTTBNAME}'""_"DBN""TBN
 
        }
 
        END {
 
        }
 
        ' ${SPLITFILE};
 
        ls $SPILTTMPDIR
 
        echo "Split file successfully : "$SPLITFILE
 
}
 
#导入
 
function import() {
 
        #importData
 
        local iter=1;
 
    while [ $iter -le $SPLITDBNUM ]; do
 
                thread_import $iter &
 
                iter=`expr $iter + 1`
 
        done
 
        #wait for child-threads
 
        wait;
 
}
 
#导入子线程
 
function thread_import() {
 
        local num=1;
 
                                targetFileName=$IMPORTTBNAME"_0"$1"0"$num
 
                                targetFile=$SPILTTMPDIR/$IMPORTTBNAME"_0"$1"0"$num
 
                                targetDB=$IMPORTDBNAME"_0"$1
 
                                targetCollection=$IMPORTTBNAME"_0"$1"0"$num
 
                                if [ ! -f $targetFile ]; then
 
                                        logInfo "spilt file does not exits : " $targetFile
 
                                        num=`expr $num + 1`
 
                                        continue
 
                                fi
 
                                user="TARGET_MONGO_USER_0"$1
 
                                TMP_USER=${!user}
 
                                password="TARGET_MONGO_PWD_0"$1
 
                                TMP_PASSWORD=${!password}
 
                                 #选择master   
 
                                selectMongoMaster $1;
 
                                #clean dirty data
 
                            if [ $CLEAN -gt 0  ]; then
 
                                        logInfo "$qdate $targetDB.$targetCollection cleaning up dirty data in mongodb"
 
                                        clean_dirty_data
 
                                        checkError "whether error occurs during cleaning dirty data from mongodb"
 
                                fi
 
                                #import data
 
                                import2mongo $1 $targetFile  $targetDB  $targetCollection
 
                #record done file
 
                statusfile="$STATUS_LOG_HOME/$targetFileName.done.`date -d $qdate +"%Y-%m-%d"`"
 
                touch $statusfile
 
                num=`expr $num + 1`
 
        done
 
        logInfo "thread $1 ends"
 
}
 
#把指定的文件导到指定的库指定的表,并建立索引,mongodb自身会判断索引是否存在
 
#不存在的情况下才创建新索引
 
function import2mongo(){
 
    if [ "$FIELDFILE" != "" ]; then
 
                MONGO_FIELD_FILE=$FIELDFILE
 
        else
 
                MONGO_FIELD_FILE=$FULLPATH/../conf/${IMPORTTBNAME}-head-file
 
        fi
 
        DATAFILE=$2
 
    if [ ! -f $DATAFILE ]; then
 
        logInfo "mongodb [${DB}.${COLL}] imported 0 objects"
 
        return 0
 
    fi
 
    TMPLOGFILE=$INFO_LOG_HOME/$DB.$COLL.tmp.log
 
        tmp=$?
 
        if [ "$tmp" != "0" ]; then
 
                return $tmp
 
        fi
 
        #data check
 
        _mongo_count=`tail $TMPLOGFILE | grep imported`
 
        _mongo_count=`expr 0$_mongo_count + 0`
 
        #start to ensure index
 
        ensureIndex
 
        logInfo "mongodb [${DB}.${COLL}] imported $_mongo_count objects"
 
        return $tmp
 
}
 
function ensureIndex(){
 
}
 
#垃圾数据清理
 
function clean_dirty_data(){
 
        day=`date -d ${1:-' -1day'} +"%y%m%d"`
 
        if [ $CLEAN -eq 1  ]; then
 
                _mongo_condition="{\"_id\":{\"\$gte\":\"${day}_0\",\"\$lte\":\"${day}_9\"}}"
 
        else
 
                _mongo_condition="{\"_id\":{\"\$lte\":\"${day}_9\"}}"
 
        fi
 
        logInfo "waiting for the clean task.."
 
        echo  $_mongo_condition
 
        tmp=$?
 
        if [ "$tmp" != "0" ]; then
 
                return $tmp
 
        fi
 
        sleep 5s
 
        logInfo "dirty data cleaned: "$targetDB  $targetCollection  $dirtyCount
 
        echo "dirty data cleaned: "$targetDB  $targetCollection  $dirtyCount
 
        return $tmp
 
}
 
#parameter check
 
function checkParams() {
 
        if [ 1 -ne $CLEAN -a 2 -ne $CLEAN ]; then
 
                logInfo "-g the parameter clean is not in [1, 2] : "$CLEAN
 
                return 1;
 
        fi
 
        if [  $FILEFORMAT != "," -a  $FILEFORMAT != "\t"  ]; then
 
                logInfo "-o the parameter file format  is not in [csv, tsv] : "$FILEFORMAT
 
                return 1;
 
        fi
 
        if [ $SPLITKEY -lt 1 ]; then
 
                        logInfo "-k split key must not be less  than 1 : "$SPLITKEY
 
                        return 1;
 
        fi
 
        if [ $SPLITDBNUM -lt 1 ]; then
 
                        logInfo "-d database number must not  be less  than 1 : "$SPLITDBNUM
 
                        return 1;
 
        fi
 
        if [ $SPLITTBNUM -lt 1 ]; then
 
                        logInfo "-t collection number must not  be less  than 1 : "$SPLITTBNUM
 
                        return 1;
 
        fi
 
        if [ ! -f  $FIELDFILE ];  then
 
                logInfo "-a field file is not a common file or not exits : "$FIELDFILE
 
                return 1;
 
        fi
 
        if [ "" = $IMPORTDBNAME ] ; then
 
                        logInfo "-m import  database name is empty  : "$IMPORTDBNAME
 
                        return 1;
 
        fi
 
        if [ "" = $IMPORTTBNAME ] ; then
 
                        logInfo "-m import  table name is empty  : "$IMPORTTBNAME
 
                        return 1;
 
        fi
 
}
 
#主函数
 
function main() {
 
                set +x
 
                echo "check split file and profile: " $SPLITFILE   $PROFILE
 
                if [ ! -f  $SPLITFILE ];  then
 
                        echo  "-f split file is not a common file or not exits : "$SPLITFILE
 
                        return 1;
 
                fi
 
                if [ ! -f  $PROFILE ];  then
 
                        echo  "-p profile file is not a common file or not exits : "$PROFILE
 
                        return 1;
 
                fi
 
                source $PROFILE
 
                qdate=`date +"%Y-%m-%d"`
 
                last_day=`date -d "-1day" +"%Y-%m-%d"`
 
                BASEFILENAME=$(basename $SPLITFILE)
 
                echo "base split file name is : "$BASEFILENAME
 
                if [ ! -d $LOG_HOME ] ; then
 
                        logInfo  " log home  is not a common directory or not exits : "$LOG_HOME
 
                        return 1;
 
                fi
 
                LOGFILE=$INFO_LOG_HOME/$BASEFILENAME.$qdate.log
 
                if [ -f $LOGFILE ]; then
 
                        mv $LOGFILE $LOGFILE.$last_day
 
                fi
 
                touch $LOGFILE
 
                ERRORFILE=$ERROR_LOG_HOME/$BASEFILENAME.error.log
 
                if [ -f $ERRORFILE ]; then
 
                        mv $ERRORFILE $ERRORFILE.$last_day
 
                fi
 
                touch $ERRORFILE
 
                #空行
 
                echo
 
                echo
 
                logInfo "start to check parameters!"
 
                checkParams
 
                checkError "whether error occurs during check parameters : $SPLITFILE"
 
                #空行
 
                echo
 
                echo
 
                logInfo "start to split file: "$SPLITFILE
 
                if [ ! -d $DATA_SPLIT_HOME ] ; then
 
                        logInfo  " data split home  is not a common directory or not exits : "$DATA_SPLIT_HOME
 
                        return 1;
 
                fi
 
                SPILTTMPDIR=$DATA_SPLIT_HOME/$BASEFILENAME
 
                echo "split temple directory : "$SPILTTMPDIR
 
                if [ -d ${SPILTTMPDIR} ]; then
 
                        rm -rf ${SPILTTMPDIR}
 
                fi
 
                mkdir -p ${SPILTTMPDIR}
 
                split
 
                checkError "whether error occurs during split data : $SPLITFILE"
 
                logInfo "split data completely : $SPLITFILE"
 
                statusfile=$STATUS_LOG_HOME/$BASEFILENAME".split.done."$qdate
 
                touch  ${statusfile}
 
                #空行
 
                echo
 
                echo
 
                logInfo "start to import split  file to mongodb"
 
                import
 
                logInfo "import data completely : $SPLITFILE"
 
                statusfile=$STATUS_LOG_HOME/$BASEFILENAME".import.done."$qdate
 
                touch  ${statusfile}
 
                #空行
 
                echo
 
                echo
 
                #remove temple directory
 
        #       if [ -d ${SPILTTMPDIR} ]; then
 
        #               rm -rf ${SPILTTMPDIR}
 
        #       fi
 
}
 
#-------------------------------------------------入口----------------------------------------------------------------
 
source /etc/profile
 
demeter_conf_cpc_qa.sh 脚本:
 
#!/bin/bash
 
source /etc/profile
 
#logger path
 
INFO_LOG_HOME="${LOG_HOME}/info"
 
STATUS_LOG_HOME="${LOG_HOME}/status"
 
if [ ! -d $ERROR_LOG_HOME ]; then
 
if [ ! -d $INFO_LOG_HOME ]; then
 
        mkdir -p $INFO_LOG_HOME
 
fi
 
if [ ! -d $STATUS_LOG_HOME ]; then
 
        mkdir -p $STATUS_LOG_HOME
 
fi
 
if [ ! -d $DATA_HOME ]; then
 
        mkdir -p $DATA_HOME
 
fi
 
#data path for source  and target data path
 
DATA_SPLIT_HOME=/data/demeter/sdata
 
#import target mongodbs
 
TARGET_MONGO_PORT_01=XXX
 
TARGET_MONGO_USER_01=XXX
 
TARGET_MONGO_PWD_01=XXX
 
TARGET_MONGO_HOST_LIST_01="test01.mongodb01:$TARGET_MONGO_PORT_01 test01.mongodb02:$TARGET_MONGO_PORT_01 test01.mongodb03:$
 
TARGET_MONGO_PORT_01"
 
TARGET_MONGO_PORT_02=XXX
 
TARGET_MONGO_USER_02=XXX
 
TARGET_MONGO_PWD_02=XXX
 
TARGET_MONGO_HOST_LIST_02="testt02.mongodb01:$TARGET_MONGO_PORT_02 test02.mongodb02:$TARGET_MONGO_PORT_02 test02.mongodb03:$
 
TARGET_MONGO_PORT_02"
 
TARGET_MONGO_PORT_03=XXX
 
TARGET_MONGO_USER_03=XXX
 
TARGET_MONGO_PWD_03=XXX
 
TARGET_MONGO_HOST_LIST_03="test03.mongodb01:$TARGET_MONGO_PORT_03 test03.mongodb02:$TARGET_MONGO_PORT_03 test03.mongodb03:$
 
TARGET_MONGO_PORT_03"
 
#mongodb utils
 
MONGO=/opt/mongodb
 
xuri-cpc-head-file
 
a
 
b
 
c
 
d
 
e
 
f
 
g
 
h
 
i
 
j
 
k
 
l
 
m
 
n
 
0
 
p
 
q
 
r
 
s
 
host:
 
XX.XX.XX.XX  test01.mongodb01
 
XX.XX.XX.XX  test01.mongodb02
 
XX.XX.XX.XX  testt01.mongodb03
 
XX.XX.XX.XX  test02.mongodb01
 
XX.XX.XX.XX  test02.mongodb02
 
XX.XX.XX.XX  test02.mongodb03
 
XX.XX.XX.XX  test03.mongodb01
 
XX.XX.XX.XX  test03.mongodb02
 
XX.XX.XX.XX  test03.mongodb03
 
 

(编辑:聊城站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章