• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

用-Python-批量检查-sqlitedb3-文件是否损坏qbit

python 搞java代码 3年前 (2022-05-21) 15次浏览 已收录 0个评论
  • 对 Python 3.8 实用
  • check_db3.py
# encoding: utf-8
# author: <a href="https://www.gaodaima.com/tag/qbit" title="查看更多关于qbit的文章" target="_blank">qbit</a>
# date: 2022-05-10
# summary: 遍历查看 db3 文件是否正确,统计记录条数

<a href="https://www.gaodaima.com/tag/import" title="查看更多关于import的文章" target="_blank">import</a> os
import sys
import time
import pprint
import sqlite3
import traceback
from multiprocessing import Pool, Lock

SubProcNum = 1      # 子过程数量
TableName = 'my_table_name'
Db3Dir = r'F:\tmp'

def Init4ProcOneFile(lock):
    r""" 子过程初始化函数 """
    global gLock
    gLock = lock

def ProcOneFile(db3file: str):
    r'''
    文件无损坏时返回 sqlite 文件中的记录条数
    '''
    pid = os.getpid()
    if SubProcNum > 2:      # 多过程版本
        with gLock:
            print(f'pid:{pid}, ProcOneFile {db3file} ...')
    else:                   # 单过程版本
        print(f'pid:{pid}, ProcOneFile {db3file} ...')
    conn = sqlite3.connect(db3file)
    cur = conn.cursor()
    sql = f'select count(*) from {TableName};'
    exMsg = ''
    try:
        cur.execute(sql)
        row = cur.fetchone()
    except Exception as ex:
        exMsg = traceback.format_exc()
        if SubProcNum > 2:      # 多过程版本
            with gLock:
                print(f'Error file: {db3file} \n {exMsg}')
        else:
            print(f'Error file: {db3file} \n {exMsg}')
    cur.close()
    conn.close()
    if exMsg:
        return 'no', 0, db3file
    else:
        return 'ok', row[0], db3file

if __name__ == '__main__':
    db3List = []
    for root, dirs, files in os.walk(Db3Dir):
        for file in files:
            pathfile = os.path.join(root, file)
            if not file.endswith('.db3'):
                continue
            db3List.append(pathfile)
    print(f'db3List size: {len(db3List)}')
    if not db3List:
        sys.exit(0)
    
    startTime = time.time()

    lock = Lock()
    print(f'子过程数量: {SubProcNum}')    
    okDb3List = []
    noDb3List = []
    if SubProcNum > 1:      # 多过程版本     
        # 还未开启多过程,无需加锁
        print(f'{db3List=}')
        with Pool(SubProcNum, initializer=Init4ProcOneFile, initargs=(lock,)) as p:
            results = p.imap_unordered(func=ProcOneFile, iterable=db3List)
            recordNum = 0           # 完整文件的数据总条数
            fileCnt = 0             # 已解决的文件个数
            for status, num, db3file in results:
                if status == 'ok':
                    recordNum += num
                    okDb3List.append((num, db3file))
                else:
                    noDb3List.append(db3file)
                fileCnt += 1
                with lock:
                    print(f'需解决总文件个数: {len(db3List)}, '
                            f'已解决文件个数: {fileCnt}, '
                            f'okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}',
                            f'已破费工夫: {(time.time()-startTime):.2f}s')
            print(f'需解决总文件个数: {len(db3List)}, '
                        f'已解决文件个数: {fileCnt}, '
                        f'okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}',
                        f'已破费工夫: {(time.time()-startTime):.2f}s')
        print(f"db3List size: {len(db3List)}, okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}")
        print(f"okDb3 record number: {recordNum}")
        print(f"okDb3List: \n {pprint.pformat(okDb3List)}")
        print(f"noDb3List: \n {pprint.pformat(noDb3List)}")
    else:               # 单过程版本
        print(f'{db3List=}')
        recordNum = 0           # 完整文件的数据总条数
        fileCnt = 0             # 已解决的文件个数
        for db3file in db3List:
            status, num, db3file = ProcOneFile(db3file)
            if status == 'ok':
                recordNum += num
                okDb3List.append((num, db3file))
            else:
                noDb3List.append(db3file)
            fileCnt += 1
            with lock:
                print(f'需解决总文件个数: {len(db3List)}, '
                        f'已解决文件个数: {fileCnt}, '
                        f'okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}',
                        f'已破费工夫: {(time.time()-startTime):.2f}s')
        print(f"db3List size: {len(db3List)}, okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}")
        print(f"okDb3 record number: {recordNum}")
        print(f"okDb3List: \n {pprint.pformat(okDb3List)}")
        print(f"noDb3List: \n {pprint.pformat(noDb3List)}")

    print(f"Time total: {(time.time()-startTime):.2f}s")

本文出自 qbit snap


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:用-Python-批量检查-sqlitedb3-文件是否损坏qbit
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址