# -*- coding:utf-8 -*- import sys import re import json import websockets import asyncio import warnings warnings.filterwarnings('ignore', '"@coroutine"', category=DeprecationWarning) def sec2hms(sec):# 时间转换 hms = str(int(sec//3600)).zfill(2)+':' + \ str(int((sec % 3600)//60)).zfill(2)+':'+str(round(sec % 60, 2)).zfill(2) return hms timeDanmaku = 8 # 普通弹幕持续时间,默认8秒 limitLineAmount = 12 # 屏上弹幕行数限制 danmakuPassageway = [] # 塞弹幕用,记录每行上一条弹幕的消失时间 title = path = f = None vposOffset = 0 # 起始时间与epoch的时间差 for i in range(limitLineAmount): danmakuPassageway.append(0) fontName = 'Source Han Sans JP' # 字体自己换 videoWidth = 1280 # 视频宽度,按720P处理了后面的内容,不建议改 videoHeight = 720 # 视频高度 fontSize = 58 head = '[Script Info]\n\ ; Script generated by Aegisub 3.2.2\n\ ; http://www.aegisub.org/\n\ ScriptType: v4.00+\n\ PlayResX: 1280\n\ PlayResY: 720\n\ \n\ [V4+ Styles]\n\ Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, marginL, marginR, marginV, Encoding\n\ Style: Default,微软雅黑,54,&H00FFFFFF,&H00FFFFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,0,2,0,0,0,0\n\ Style: Alternate,微软雅黑,36,&H00FFFFFF,&H00FFFFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,0,2,0,0,0,0\n\ Style: Danmaku,'+fontName+','+str(fontSize)+',&H00FFFFFF,&H00FFFFFF,&H00000000,&H00000000,-1,0,0,0,100,100,2,0,1,1.5,0,2,0,0,10,0\n\n\ [Events]\n\ Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n' def start_writing(): global f, path f = open(path,'w',encoding='utf-8-sig') f.write(head) def process_line(json_line, live=False): global title, vposOffset try: message = json.loads(json_line) except json.JSONDecodeError: print('error: '+json_line) return if message['kind'] == 10 and 'live_session_id' in message: if not title: title = message['title'] vposOffset = message['last_item_updated'] start_writing() if live: print('已开播 '+sec2hms(message['elapsed'])) return elif message['kind'] == 0 and 'user_id' in message: vpos = message['created']-vposOffset vpos_end = vpos+timeDanmaku else: return text=message['name'] +'[Lv.{}]: '.format(message['level'])+message['body'] vpos_next_min = float('inf') vpos_next = vpos+1280/(len(text)*60+1280) * 8 for i in range(limitLineAmount): if vpos_next >= danmakuPassageway[i]: passageway_index = i danmakuPassageway[i] = vpos+8 break elif danmakuPassageway[i] < vpos_next_min: vpos_next_min = danmakuPassageway[i] Passageway_min = i if i == limitLineAmount-1 and vpos_next < vpos_next_min: passageway_index = Passageway_min danmakuPassageway[Passageway_min] = vpos+8 # 计算弹幕位置 sx = videoWidth sy = fontSize*(passageway_index) ex = 0 for i in text: if re.search("[A-Za-z 0-9',.]",i): ex = ex-30 else: ex = ex-60 ey = fontSize*(passageway_index) f.write('Dialogue: 0,'+sec2hms(vpos)+','+ sec2hms(vpos_end) + ',Danmaku,'+message['name'].replace(',','')+',0,0,0,,{\\an7\\move('+str(sx)+','+str(sy)+','+str(ex)+','+str(ey)+')}'+text+'\n') @asyncio.coroutine def shutdown(): if f: f.close() print(title+'的弹幕已经存为'+path) yield loop = asyncio.get_event_loop() match = re.match(r'^(?:https://)?mixch.tv/u/(\d+)(?:/.*)?$', sys.argv[1]) if not match: match = re.match(r'^wss://chat.mixch.tv/.*/(\d+)$', sys.argv[1]) if match: path = 'torte_'+match[1]+'.ass' url = 'wss://chat.mixch.tv/torte/room/'+match[1] websocket = loop.run_until_complete(websockets.connect(url)) @asyncio.coroutine def receive(): try: line = yield from websocket.recv() yield process_line(line, live=True) asyncio.ensure_future(receive()) except websockets.ConnectionClosedOK: print() asyncio.ensure_future(shutdown()) try: print('抓取实时评论中,按Ctrl+C终止并保存') asyncio.ensure_future(receive()) loop.run_forever() except KeyboardInterrupt: loop.run_until_complete(websocket.close()) else: path = sys.argv[1]+'.ass' with open(sys.argv[1], 'r', encoding='utf-8') as file: for line in file: process_line(line) loop.run_until_complete(shutdown())