深度学习的实验通常都需要跑很久,如果没有主动推送的手段,那么就会出现以下两种情况:
过一段时间检查一下实验结果,无法专心干别的事情。
撇下不管,实验跑g了也不知道,跑完了也不知道。
为解决以上问题,就需要给实验程序加上监控进程,在实验结束后进行通知并发送实验结果。本文的脚本主要包含三部分内容:进程监控,tensorboard
数据提取,企业微信机器人通知。
进程监控
通常来说,实验放在后台跑会比较安全,不会因为terminal
关闭而停止,放后台跑的命令为:
1 nohup your_command >log_name.log 2>&1 &
进程监控的作用有两个:
当实验异常结束时及时通知
当实验正常结束时通知结果
实现起来比较简单,只要监控进程是否结束就行,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import osfrom utils.wechat_bot import research_botimport timeimport datetimedef process_num (key_words="" ): num_process = int (os.popen(f"ps -ef |grep -v grep|grep {key_words} |wc -l" ).read()) return num_process while True : if process_num() == 0 : now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S" ) send_report_text("实验结束,正在分析结果..." ) break time.sleep(60 )
Tensorboard 数据提取
深度学习的学习过程数据通常由 tensorboard
记录到日志文件中,使用 tensorboard
可以很方便的查看训练数据,但这也降低了对训练数据进行其他操作的便捷性。下面这个脚本可以用来提取
tensorboard
产生的日志数据,并将其写入到 excel
文件中 。
使用时只需调用 tb_to_excel
函数即可,输入格式为:
path_list:字符串构成的列表,每一项为一个日志的路径,如
['logs/0','logs/1']
excel_path:导出的 excel
路径,如
'export.xlsx'
one_sheet(default: False):是否将所有数据合并到一个 excel sheet
中
执行结果:path_list 中的日志文件导出到目标 excel 文件中,path_list
中的每一项对应 excel 文件中的每个 sheet。若
on_sheet=True
,则所有数据将通过 pd.concat
合并到一个 sheet中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import datetimeimport pandas as pdfrom tensorboard.backend.event_processing import event_accumulatorimport osfrom typing import Dict import sysimport collectionsdef get_tb_data (path ): tb_data = event_accumulator.EventAccumulator(path) tb_data.Reload() return tb_data def tb_to_df (tb_data: event_accumulator.EventAccumulator ): keys = tb_data.scalars.Keys() df = pd.DataFrame(columns=keys) for key in keys: data = tb_data.scalars.Items(key) df[key] = pd.Series([item.value for item in data]) if 'step' not in df.columns: df['step' ] = pd.Series([item.step for item in data]) if 'wall_time' not in df.columns: df['wall_time' ] = pd.Series([item.wall_time for item in data]) order = ['step' , 'wall_time' ] + keys df = df[order] return df def get_tb_df (path ): tb_data = get_tb_data(path) df = tb_to_df(tb_data) return df def extract_all_data (path_list:list ) -> Dict [str , pd.DataFrame]: data_dict = {} for filepath in path_list: fn = filepath.split('/' )[-1 ] if os.path.isdir(filepath): print ("读取并处理:" , filepath) sheet_name = fn df = get_tb_df(filepath) data_dict[sheet_name] = df return data_dict def df_to_excel (df_dict, path, one_sheet=False ): if one_sheet: df_all = pd.DataFrame() for df in df_dict.values(): df_all = pd.concat([df_all, df], ignore_index=True ) df_all.to_excel(path, index=False ) else : with pd.ExcelWriter(path) as writer: for sheet_name, df in df_dict.items(): print ("写入表:" , sheet_name) df.to_excel(writer, sheet_name, index=False ) def tb_to_excel (path_list, excel_path, one_sheet=False ): df_dict = extract_all_data(path_list) df_to_excel(df_dict, excel_path, one_sheet)
企业微信机器人通知
主动推送有很多方式可以选择,我个人之前了解过的包括但不限于:
SMTP 发送邮件
Server 酱之类的 Push 服务
QQ 机器人
微信机器人
企业微信应用
企业微信群机器人
这些方式各有其优缺点。实验结果推送的需求为:
实时性好:发送后能立即接受并通知
无响应:只需单向通知,无需设置回复
因此,在衡量了上述需求,以及考虑实现的便捷性,选用企业微信群机器人来作为信息推送的方式。
首先,创建一个企业微信群机器人:
创建一个企业微信群聊(>=3人即可)
把无关的人都踢了
添加一个群机器人
查看机器人信息,在Webhook
地址中找到key
值并记录
然后把key
填入下面脚本中即可使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import requestsfrom urllib3.util.retry import Retryfrom requests.adapters import HTTPAdapterimport hashlibimport base64SLEEP_INTERVAL = 0.1 MAX_RETRIES = 1 retries=Retry( total=MAX_RETRIES, backoff_factor=SLEEP_INTERVAL, status_forcelist=[403 , 500 , 502 , 503 , 504 ], ) default_headers={ "User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 Edg/98.0.1108.62" , "Accept-Encoding" : "gzip, deflate" , 'Content-Type' : 'application/json' } client = requests.Session() client.mount("http://" , HTTPAdapter(max_retries=retries)) client.mount("https://" , HTTPAdapter(max_retries=retries)) client.headers=default_headers BotList = { "实验小助手" :"XXX" } class Bot : def __init__ (self, key ) -> None : self._base_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send" self._key = key self._url = f"{self._base_url} ?key={self._key} " def send_msg (self, msg ): response = client.post(self._url, json=msg) def send_text (self, content ): msg = { "msgtype" : "text" , "text" : { "content" : content } } self.send_msg(msg) def send_image (self, image_path ): with open (image_path, "rb" ) as f: image = f.read() base64_bytes = str (base64.b64encode(image))[2 :-1 ] md5 = hashlib.md5(image).hexdigest() msg = { "msgtype" : "image" , "image" : { "base64" : base64_bytes, "md5" : md5 } } self.send_msg(msg) def send_markdown (self, content ): msg = { "msgtype" : "markdown" , "markdown" : { "content" : content } } self.send_msg(msg) def send_news (self, articles ): msg = { "msgtype" : "news" , "news" : { "articles" : articles } } self.send_msg(msg) research_bot = Bot(BotList["实验小助手" ]) if __name__ == "__main__" : research_bot.send_text("Hello I'm your research assistant" )