最近没顾上继续折腾 Spring Cloud,而是和 Python 相爱相杀。唉,一开始还小瞧了小小的 Python,没想到还真费了我不少劲呢,前前后后差不多搞了快 2 周,做出来的东西才稍微像那么回事。
起因
最近我的小手机老是提示存储空间不足,有时候连拍照都提示失败。以前也遇到过这样的情况,作为一名老司机,一般会找几个不怎么用的 APP 删掉,这样就可以腾出空间了,可最近这招似乎不太灵了。是的,就是下面这种情况。
后来仔细检查了一遍发现原来是相册太大了,整整有 80 多个 G。
唉,至从有了高像素摄像头之后,越来越多的照片舍不得删掉呀。
可现在没办法了,既然舍不得删掉,那就导出到电脑上吧。首先上场的是我的主力台式老爷机(拥有一颗二代i3处理器),搭载优秀的 Linux 操作系统(不需要到处找激活码),使用网上比较流行的 shotwell 进行照片导入,界面差不多就像下面这样。
shotwell 会自动按照片创建的时间导入到指定目录下。只要能「连上手机」,操作也是相当的简单,不过要想连上手机似乎就有点困难了,比如会出现这样的错误提示。
既然不能直接从设备导入照片,那就换个角度,从文件夹里导入就好了嘛,不成想自己还是太单纯了。经过漫长的等待后又再次弹框。
(此处假装有图片)
唉,行。可能是我的小手机不配高冷的 Linux 呢,那我换个接地气的 Windows 试试看呢?拿出尘封多年的笔记本,连上小手机,选择照片导入。看着熟悉的文件拷贝窗口,不禁感叹到,还是 Windows 懂生活啊。
哎?哎!怎么又报错了!看来大厂的软件也不咋滴嘛。
究竟是手机的矫情,还是电脑的假正经,我是没想明白的,总之他俩就是合不到一块儿。
不就是文件拷贝吗?咋这么难呢?实在不行那就自己动手咯。
自己动手,丰衣足食
老规矩,先看最终效果图,觉得效果还凑合,咱再接着慢慢聊(毕竟后面的废话还挺多)。
这是一个运行在终端上的 Python 脚本,为什么选 Python?因为我觉得小小的照片导入功能,不就是递归拷贝吗?Python 足矣!
初代方案
方案其实挺简单的,按如下步骤执行即可实现照片导入。
- 递归遍历指定目录。
- 若遍历到的是文件且是指定文件类型则获取其 MD5 值,否则跳过该文件。
- 在数据库中查找刚刚获取的 MD5 值,若能找到则跳过,继续执行第二步。
- 不能找到说明这是一个新文件,那么获取其元数据(分辨率、地理信息等)
- 将文件拷贝至指定目录
- 将元数据和 MD5 值一起保存进数据库
- 重复执行第二步,直到遍历结束。
上面看上去步骤挺多,实际代码写出来很少,而且很简单。遍历用os.walk()
即可。
嗯?图片元数据怎么获取?这也不难,利用 PIL 库即可获取到照片的各种元数据。不过在获取的时候可能出现意外情况,比如图片不包含元数据(可能是被清掉了,也可能是图片本身就没有),遇到这种情况就得从文件属性里面获取其它有用信息了(如:文件创建时间)。相关代码请看这里:
from support.utils.file.base import MediaFile
from PIL import Image
from PIL.ExifTags import TAGS
from support.ui.console import Log
class Picture(MediaFile):
KB = 1024
KEY_WIDTH = ['ImageWidth','ExifImageWidth']
KEY_HEIGHT = ['ImageLength','ExifImageHeight']
KEY_GPS = 'GPSInfo'
KEY_DATE = 'DateTimeOriginal'
WIDTH = 'Width'
HEIGHT = 'Height'
DATE = 'Date'
LOAC = 'Local'
HASH = 'Hash'
def __calcDMS(self, value):
return value if len(value)==1 else value[0]/value[1]
def __transDMS(self, value):
if('2' not in value.keys()):
return ""
latitudinal = self.__calcDMS(value[2][0]) + self.__calcDMS(value[2][1])/60.0 + self.__calcDMS(value[2][2]) /3600.0
longitude = self.__calcDMS(value[4][0]) + self.__calcDMS(value[4][1])/60.0 + self.__calcDMS(value[4][2]) /3600.0
return "{},{}".format(longitude if value[3].upper() == 'E' else -longitude, latitudinal if value[1].upper() == 'N' else -latitudinal)
def __hasTag(self, filtermap, key):
if(filtermap is None):
return True
for filter in filtermap:
if(isinstance(filter, str) and key == filter):
return True
if(isinstance(filter, list) and key in filter):
return True
return False
def __transKey(self, key):
if(key == self.KEY_DATE):
return self.DATE
if(key == self.KEY_GPS):
return self.LOAC
if(key in self.KEY_HEIGHT):
return self.HEIGHT
if(key in self.KEY_WIDTH):
return self.WIDTH
return key
def getMetaData(self, path, filtermap=None):
metaData = {}
metaData[self.HASH] = self.calHash(path)
if(self.isVideo(path)):
metaData[self.DATE] = self.getCreateDate(path)
else:
try:
file = Image.open(path)
info = file._getexif()
if(info):
for (tag, value) in info.items():
key = TAGS.get(tag,tag)
if(self.__hasTag(filtermap, key)):
metaData[self.__transKey(key)] = self.__transDMS(value) if(key == self.KEY_GPS) else value
except Exception as e:
Log.e(__file__, type(e), str(e))
finally:
if(self.DATE not in metaData.keys()):
metaData[self.DATE] = self.getCreateDate(path)
return metaData
def getData(self, meta, key, default=""):
if(key not in meta.keys()):
return default
return meta[key]
备注:元数据并不是照片导入的必要信息,只不过我在最开始做的时候,还没想好照片因以什么规则进行自动归类,是按日期?还是按地理信息?
从最初的方案里面可以看出相片的拷贝依赖主线程一个文件一个文件的处理。如果只有少量的数据这样玩儿问题不大,可我有 80 多 G 的照片呀!一个一个拷贝,得到天荒地老吗?
这方案完全就是在玩儿我呀!不行,得优化!
二代方案
既然一个一个文件处理太慢了,那就并行处理呗,咱上多线程。
线程是个好东西,执行效率提升很明显,但是线程的创建是会消耗系统资源的,总量80G的小文件,每个文件都创建一个线程,呵呵~
我们在提高效率的时候,也得注意系统资源的占用情况,所以采用线程池来管理。好在 Python 已经有非常成熟的线程池方案了,直接使用即可。方案如下:
- [new]初始化指定大小的线程池
- 递归遍历指定目录。
- 若遍历到的是文件且是指定文件类型则获取其 MD5 值,否则跳过该文件。
- 在数据库中查找刚刚获取的 MD5 值,若能找到则跳过,继续执行第二步。
- 不能找到说明这是一个新文件,那么获取其元数据(分辨率、地理信息等)
- [new] 从线程池中分配一个线程执行文件拷贝动作
- 如果当前执行队列中的线程数量已达到阈值,则等待至少 1 个线程完成拷贝动作后再继续执行
- 将元数据和 MD5 值一起保存进数据库
- 重复执行第二步,直到遍历结束。
- [new]遍历结束后,等待执行队列中所有线程执行完成。
这里设计 2 个队列来保存任务,是有优化空间的。self.threadPool = ThreadPoolExecutor(maxqueue) self._lock = RLock() self.dashboard = dashboard self.idleQueue = [] self.runningQueue = []
最初的想法是主线程先将任务 push 进空闲队列,当空闲队列中待执行任务数量达到阈值后再去线程池中取线程逐一执行,并将分配的线程 push 进执行队列。防止主线程不停的向线程池要线程,导致线程池创建线程总数远远超过期望数量。
而当执行队列中线程数达到 2 倍阈值就强行等待线程执行完毕,期望通过这种方式让线程池中的线程尽可能利用起来。
可是这个设计存在不可修改的缺陷,首先线程执行完之后是直接回到线程池中的,我所设计的执行队列无法感知到这个变动,那么就会出现某线程实际已处于空闲状态,可是仍然在执行队列当中。其次线程真正开始执行需要等待空闲队列中的任务数量达到阈值,那么在未到达阈值这个时间段内,其实线程未被合理利用,同时由于队列的限制,在实际运行时会出现一批一批的执行,而不是滚动执行。
所以后面就优化成先执行,后判断是否需要等待线程空闲的模式。但是代码上仍然留有之前方案的痕迹(其实就是自己懒得改了)。
def execJobs(self, force=False):
self._lock.acquire()
for task in self.idleQueue:
self.runningQueue.append(self.threadPool.submit(task['func'], task['args']))
self.idleQueue.clear()
while(len(self.runningQueue) >= self.maxqueue):
Log.i(Task.TAG, "waiting for all running thread completed", len(self.runningQueue))
for thread in as_completed(self.runningQueue):
self.runningQueue.remove(thread)
break
if(force):
for thread in as_completed(self.runningQueue):
Log.i(Task.TAG, thread)
self.runningQueue.clear()
self._lock.release()
现在线程有了,执行速度也提上去了,用小手机试试,发现也能正常拷贝完 80 多 G 的照片了,并且能将照片单独保存在以日期命名的各个文件夹当中。
如果事情到这就及时截止,一切都那么美好。可是做完后,我突发奇想,如果能有一个进度条来指示拷贝进展就好了,于是我就掉进了一个大坑当中。
Rich
Rich 是一个 Python 库,可以为您在终端中提供富文本和精美格式。
Rich API 可以很容易的在终端输出添加各种颜色和不同风格。Rich 还可以绘制漂亮的表格,进度条,markdown,突出显示语法的源代码及回溯等等,不胜枚举。
上述均转自 Rich 的 readme 文件,不可否认效果确实挺漂亮的,而且封装非常好。就是文档太少了。国内的相关文档要么把人家的 readme 拷贝一次,要么就是同一篇文章抄来抄去。唉,也许这也是国内技术博客的现状吧。
下文为 rich 踩坑实录。均为个人理解,若与Rich官方文档冲突,以官方为主
layout
既然用上了 rich,我当然希望能更好的使用它强大的显示特性,我不仅仅想要有进度条,还想有状态、日志等相关信息展示。所以这里就需要用到 layout 功能,将屏幕进行分割。
layout 的 API 其实挺简单的,将屏幕按行分割或按列分割,分割之后的每一个区域又可以再按行或列分割。同时可以指定分割后的区域大小,占比等信息。比如我就是这样分割整块屏幕的。
layout = Layout(name="root")
layout.split(
Layout(name="header", size=3),
Layout(name="main", ratio=1),
Layout(name="copyright", size=9),
)
layout["main"].split_row(
Layout(name="info", ratio=2),
Layout(name="progress", ratio=1)
)
layout["progress"].split(
Layout(name="overall", size=3),
Layout(name="jobs", ratio=1)
)
分割好之后就需要向各个区域填充想要显示的控件。rich 支持多种填充控件,我这里主要用到了进度条(progress)、文本显示(panel)。
进度条 Progress
样式
首先需要给 progress 定义显示样式。Rich 封装了几种常用的样式,按需组合即可。在初始化 progress 时即可指定样式:
self.overallProgress = SmartProgress(TextColumn("[progress.description]{task.description}"), CountColumn(), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"))
self.jobsProgress = SmartProgress(TextColumn("{task.description}", justify="left"),"|",DownloadColumn(),BarColumn(bar_width=None),"|",TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),"|",TimeRemainingColumn())
这里的 SmartProgress 是我二次封装的 progress,但是初始化函数与 Progress 是一致的,参数乍看上去挺复杂的。但是拆解一下就一目了然了。
以第一个 overallProgress 为例。它的参数分别是TextColumn(), CountColumn(), BarColumn(), TextColumn()
。这些参数共同构成了 overallProgress 的显示样式,其排列的顺序就是最终显示的顺序。显示的效果即:
文本 计数器 进度条 文本
而前后两个文本到底显示什么呢?我们继续拆解参数。
第一个 TextColumn 参数为"[progress.description]{task.description}"
,它表示会显示 progeess 的 description 属性,同时也显示自定义属性 task.description 的内容,内容排序以参数中定义的顺序排列。
第二个 TextColumn 参数为"[progress.percentage]{task.percentage:>3.0f}%"
,它表示显示 progress 的 percentage 属性,同时显示自定义 task.percentage 并在最后加上百分号。
这样回头再看 overallProgress 的显示样式会是这样的:
进度描述 计数器 进度条 当前百分比
而第二个 jobsProgress 经过拆解后也能得知其显示样式为:
任务描述 | 下载计数 | 进度条 | 进度百分比 | 预计剩余时间
列
列是 progress 中的术语,多个 progress 在一起的时候,会 Rich 会保持其列对齐。因此列只需要关注自己的显示内容即可。在 progress 参数中出现的 CountColumn 同样是二次封装的列,其代码如下:
from rich.progress import BarColumn, DownloadColumn, Progress, TextColumn, TimeRemainingColumn, ProgressColumn
from typing import Optional
from rich.text import Text, TextType
from rich.table import Column
class CountColumn(ProgressColumn):
def __init__(
self, binary_units: bool = False, table_column: Optional[Column] = None
) -> None:
self.binary_units = binary_units
super().__init__(table_column=table_column)
def render(self, task: "Task") -> Text:
if(task.total is None):
count_status = f"{task.completed}/?"
else:
total = int(task.total)
count_status = f"{task.completed}/{total}"
count_text = Text(count_status, style="progress.download")
return count_text
可以看出列最主要的作用就是返回此时应该显示什么内容。
进度条控制
在上文提到多进度条的时候 Rich 会保证其列对齐。可是我们似乎只初始化了 2 个进度条呢,是指他两列对齐吗?当然不是。先来了解 一下Progress 提供的 API ,然后就明白了。
Progress并不是看到的某一个具体的进度条,而是某一类(相同样式)的集合。通过add_task
的方式可以在其中添加任意条实际会显示的progress。该方法每调用一次就会返回一个taskID,之后可以通过ID操作某一个具体的进度条。
Progress可操作的内容均和其显示的内容有关,常用的如:
- description 进度条的描述信息
- total 进度条总长(默认100)
- completed(当前已完成长度)
- advance(类似追加的概念,每次设置后会在当前completed的基础上再追加advance,并重新设置completed)
- 自定义属性
除上述内容外,还可以获取当前所有的task,遍历每一个task状态等。详细的可参考Rich的源码progress.py。
文本显示 panel
Panel很特殊,它更像是一个“容器”,将另一个控件包裹起来,通常里面会放入Table.grid(网格控件)或上文提到的Progress。下面是一个放入网格的例子:
class CopyrightPanel:
def __init__(self, title="[ Copyright ]", style="bright_blue"):
self.title = title
self.style = style
def __rich__(self) -> Panel:
sponsor_message = Table.grid(padding=1)
sponsor_message.add_column(style="green", justify="right")
sponsor_message.add_column(justify="left")
sponsor_message.add_row(
"Gitee",
"[u blue link=https://gitee.com/ray0728/multimedia-file-synchronizer/tree/release-mfs]https://gitee.com/ray0728/multimedia-file-synchronizer/tree/release-mfs",
)
sponsor_message.add_row(
"Blog",
"[u blue link=https://www.ray0728.cn/]https://www.ray0728.cn/",
)
sponsor_message.add_row(
"Email",
"[u blue link=mailto://51101661@qq.com]51101661@qq.com",
)
intro_message = Text.from_markup("""\
Please provide more comments and bugs! or buy me a coffee to say good job.
- Ray
The UI is developed based on the [bold magenta]Rich[/] component.
[u blue link=https://github.com/sponsors/willmcgugan]https://github.com/sponsors/willmcgugan[/]
""")
message = Table.grid(expand=True)
message.add_column()
message.add_column()
message.add_row(intro_message, sponsor_message)
message_panel = Panel(
message,
title=self.title,
border_style=self.style,
)
return message_panel
这是封装过的panel,提供静态的copyright信息。
装载
准备好progress和panel之后就可以向layout装载这些东西了。
self.logRedirect = LogPanel()
# Log.logRedirect = NullDev()
Log.logRedirect = self.logRedirect
layout["info"].update(self.logRedirect)
layout["copyright"].update(CopyrightPanel())
layout["header"].update(ClockPanel(title, "white on blue"))
layout["overall"].update(Panel(self.overallProgress, title="Overall Progress", border_style="orange4"))
layout["jobs"].update(Panel(self.jobsProgress, title="[ Jobs Progress ]", border_style="green"))
刷新 Live
Rich使用Live来实现控件刷新的,比如progress,上文装载的多个控件都可以通过Live的方式实现控件的定时刷新。
Live刷新的本质
Live刷新是通过根控件,逐一查找其子控件,并根据子控件当前数据以及状态重绘子控件来实现界面刷新。
这个方法被封装在Live的update当中。当Live启动定时线程后,由定时线程去调用update方法。
因为Rich最终是在stdout或自定义file中输出。如果存在多个Live同时刷新界面,那么输出一定是乱序的,所以Rich不允许同时启动多个Live。
实测发现在不同操作系统上Rich表现不一样,Linux上的表现比在Windows上更丰富,且平滑。
Live手动刷新
回到我的相片拷贝业务上来,文件的遍历一定是主线程该干的事儿,文件拷贝是子线程干的事儿,界面刷新则是Live干的事儿。界面主要显示内容应该是拷贝的进度。
因为Live与各拷贝子线程之间并无联系,因此拷贝进度无法通知到Live,并实时刷新界面显示。但如果让拷贝子线程有能力通知并触发Live刷新了,这就又与Live的定时刷新机制相冲突。
幸运的是Live提供了不开启定时刷新的开关,因此不用侵入式修改Rich源码,只需要将Live的相关动作封装一下,就可以在不使用定时刷新线程的情况下,手动刷新界面了。
def show(self):
self.live = Live(self.layout, refresh_per_second=1, screen=True, auto_refresh=False)
#self.live = Live(self.layout, refresh_per_second=4, screen=True)
self.live.start(True)
def update(self):
self._lock.acquire()
if(self.live is not None):
self.live.update(self.layout)
self._lock.release()
def hiden(self):
if(self.live is not None):
self.live.stop()
何时刷新
一般来说可以在任意时候调用,不过为了让调用更优雅,我将刷新调用逻辑封装在了线程task.py当中,简单来说子线程在必要的时候可回调Live刷新接口。
最终效果
虽然看上去还行,但是代码还是有些小毛病,比如刷新不够平滑,进度条有时候会有跳变,如果有好心人发现了bug,也非常欢迎提交修改合入。
相关源码已在Gitee开源