分享两个工作中最常用脚本

分享 leeston9 ⋅ 于 2022-05-02 19:40:18 ⋅ 1858 阅读
  1. 多线程查找文件脚本: 课输入参数控制线程数,默认为1
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
import os
import sys
import argparse
import traceback
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

def listAllFiles(dP, fK):
    if os.path.isdir(dP):
        files = list(os.listdir(dP))
        if len(files) > 0:
            for b in files:
                listAllFiles(dP + '/' + b, fK)
    else:
        if dP[dP.rindex('/'):].__contains__(fK):
            print('[{}]: --> '.format(threading.current_thread().getName()) + dP)

def submit_one_thread(task_func, *params):
    return eval('''threadPool.submit(task_func, {})'''.format("'" + "','".join(list(params))+"'"))

def run():
    if search_path and os.path.isdir(search_path):
        try:
            for da in os.listdir(search_path):
                task_list.append(submit_one_thread(listAllFiles, search_path + da, keyword))
            if len(task_list) < thread:
                print("[WARN]  thread num is {} but only {} thread works !".format(thread, len(task_list)))
            for _ in as_completed(task_list):
                pass

        except Exception:
            traceback.print_exc()
            os.system('python ' + sys.argv[0] + ' -h')
            sys.exit(0)

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('-p', '--path', metavar='\b', type=str, default=None,
                        help='The searching path scale you need input..   eg : /data/')
    parser.add_argument('-k', '--keyword', type=str, metavar='\b', default=None,
                        help='The searching keyword that filename contains in...   eg: lee')

    parser.add_argument('-t', '--thread', type=int, metavar='\b', default=1,
                        help='how many thread that you need...   eg: lee')

    args, _ = parser.parse_known_args()
    try:
        search_path, keyword, thread, task_list = args.path.replace('\\', '/'), args.keyword, args.thread, []
    except Exception:
        os.system('python ' + sys.argv[0] + ' -h')
        sys.exit(0)
    threadPool = ThreadPoolExecutor(max_workers=thread, thread_name_prefix="Thread")

    '''
    多线程的方式执行文件查找
    '''
    run()

可以输入三个参数,第一个是要从哪个目录查询,第二个是要查询的文件名关键字,第三个是线程数,默认为1个线程
file
比如我要查询 / 下面带hadoop 的文件,并指定10个线程
file
结果如下:
file

  1. 第二个脚本,查询当前目录下的文件中,里面包含了什么关键字,返回该文件名:
    
    #!/bin/bash
base_path=$(cd "$(dirname "$0")"; pwd)

res=`ls $base_path`
res2=''

function get_py_location(){
for i in $res;
do
   if [ -f "$i" ]; then
        res2=`cat $base_path/$i | grep -i "$1"`
        if [ '' != "$res2" ]; then
        # echo -e "\033[31m 红色字 \033[0m" 
              echo -e "\n\n\t\033[31m关键词: $1\t脚本位置: $base_path/$i\033[0m\n\n"
              echo "$res2"
        fi
   fi
done
}

if [ $# -eq 0 ]; then
  echo "请传入参数:参数是关键词 如 finance_realtime_report"
fi

if [ $# -eq 1 ]; then
   get_py_location $1 | grep -v "\.log" | grep  -E "\.py|\.groovy|\.java|\.sh" | grep '关键词'

fi

参数只有一个 为内容包含的关键字,效果如下:
file

补充第二个脚本,python版,功能更强

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import argparse
import traceback
import threading
import io
from concurrent.futures import ThreadPoolExecutor, as_completed

total_c = 0

def listAllFiles(dP):
    if os.path.isdir(dP):
        files = list(os.listdir(dP))
        if len(files) > 0:
            for b in files:
                listAllFiles(dP + '/' + b)
    else:
        global total_c
        total_c += 1
        if os.stat(dP).st_size < size * 1024 and not str(dP).endswith('jar') and not str(dP).endswith('pyc') \
                and not str(dP).endswith('pyo'):
            catRes = os.popen(
                "cat -n '{path}' | grep '{content}' | awk '{{print $1}}'".format(path=dP, content=content)).read()
            if catRes.strip() != '':
                print('[{}]: -> find "{}" in ‖ {} ‖ at line: {}'.format(threading.current_thread().getName(),
                                                                        content, dP,
                                                                        ','.join(catRes.strip().split('\n'))))

def readFile(filePath, contentKey, fileSize):
    if os.stat(filePath).st_size < fileSize:
        find, counter = False, 1
        for row in io.open(filePath):
            if str(row).__contains__(contentKey):
                find, counter = True, counter
            if not find:
                counter += 1
        return find, counter

def submit_one_thread(task_func, *params):
    return eval('''threadPool.submit(task_func, {})'''.format("'" + "','".join(list(params)) + "'"))

def run():
    if search_path and os.path.isdir(search_path):
        try:
            task_list = []
            for da in os.listdir(search_path):
                task_list.append(submit_one_thread(listAllFiles, str(search_path).rstrip('/') + '/' + da))
            if len(task_list) < thread:
                print("[WARN]  thread num is {} but only {} thread works !".format(thread, len(task_list)))
            for _ in as_completed(task_list):
                pass

        except Exception:
            traceback.print_exc()
            os.system('python ' + sys.argv[0] + ' -h')
            sys.exit(0)
        print('search file total: {}'.format(total_c))

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('-p', '--path', metavar='\b', type=str, default=None,
                        help='The searching path scale you need input..   eg : /data/')
    parser.add_argument('-c', '--content', type=str, metavar='\b', default=None,
                        help='The content that you want find.   eg: stars')

    parser.add_argument('-t', '--thread', type=int, metavar='\b', default=1,
                        help='how many thread that you need...   eg: 2')

    parser.add_argument('-s', '--size', type=int, metavar='\b', default=1024,
                        help='file size limit in KiBytes, default 1024 kb )...   eg: 100')

    args, _ = parser.parse_known_args()
    try:
        search_path, content, thread, size = args.path.replace('\\', '/'), args.content, args.thread, args.size
    except Exception:
        os.system('python ' + sys.argv[0] + ' -h')
        sys.exit(0)
    threadPool = ThreadPoolExecutor(max_workers=thread, thread_name_prefix="SearchThread")

    run()

效果:
file

总结: 这两个脚本在工作中非常实用,第一个脚本是优化了find 命令的用法,第二个脚本很适合再大量脚本的文件夹中找到包含了某些代码块的脚本有哪些,我经常会用它来查询一些有相关依赖的脚本。

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-leeston9,http://hainiubl.com/topics/75857
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter