こんにちは。
現在、YouTubeでは長時間の配信を多く見かけるようになり、その結果切り抜き動画の需要が高まっているように思います。
今回、切り抜き動画作成の大部分を自動化するツールを作成したので紹介します。
切り抜き箇所の候補を自動的に見つけたかったため、まずその部分を自動化し、ついでにその他動画編集部分なども自動化したという背景のもと作成されたツールです。
主に以下の処理を自動化しました。
- ライブ配信のアーカイブからチャットとコメントを取得
- 特定のキーワードに該当するチャットとコメントを抽出
- 該当の動画をダウンロード
- 切り抜きや結合を含む動画編集
手動で実施する必要のある処理としては以下が残っています。
- 切り抜き候補を実際の動画で確認し、切り抜き開始時刻と終了時刻を正確に指定
また、本記事で紹介するコードはすべてGitHub上にも公開しています。
更新履歴
日付 | ver. | 更新内容 |
---|---|---|
2025/01/19 | v1.6.0 | 動画URL一覧の出力後にそれを参照して時系列順にチャットをダウンロードするよう変更(既存チャットの重複ダウンロード防止)、チャットの判定段階を1段階追加、カウンターのヘッダにカテゴリ名を表示するよう変更、MoviePy v2.X移行対応 |
2024/11/11 | v1.5.0 | 結合部ノイズ除去追加、カウンター表示機能追加、カウンター機能に伴ったresults.txtの記述方法追加、タイムスタンプでカテゴリごとに採番する機能追加、フォントファイル名に依存しないよう変更、日ごとのタイムスタンプ出力機能追加、生成過程の動画の削除可否を引数で指定するよう変更 |
2024/08/18 | v1.4.0 | チャットデータのダウンロード時にもCookieファイルを参照するよう変更、動画全編ダウンロード時にCookieファイルが参照されない問題の修正、月ごとと年ごとのタイムスタンプ出力機能追加 |
2024/07/31 | v1.3.0 | 音量正規化機能追加、シンボリックリンクでフォントファイルが読み込まれない問題の修正、完成動画の一時ファイルを削除するよう変更、各動画出力後のメモリ解放を明示的に指定、タイムスタンプを先に出力するよう変更、動画合計時間と現在の動画処理時間の標準出力機能追加、results.txtの行頭に回数メモ機能追加、results.txtにコメントアウトの行を入れられるよう変更、動画全編読み込み時のエラー修正、Cookieファイルによりログイン状態で処理できるよう修正 |
2024/06/12 | v1.2.0 | 切り抜き周辺のみのダウンロード機能追加、最高品質がmp4以外の場合に対応、タイムスタンプの同一事象連番対応 |
2024/05/19 | v1.1.0 | タイムスタンプの誤差が累積する問題の修正、表示日付を投稿日から公開日に変更、公開日を任意の文字列に編集できるよう変更(list_date_title.txtの必須化) |
2024/05/10 | v1.0.0 | 大幅改修(sum_superchat.pyの削除(video.shの検索文字列不要化)、文字列検索のpython完結化、results.txtへの全情報集約化、クラスタリングアルゴリズム一部変更(sum_superchat.py削除による、該当チャット直前数秒間のチャット取得のためのリングバッファ追加)、タイトル表示機能追加、切り抜き並べ替え対応、各切り抜き動画ファイル名のmsec化(時刻編集時に以前の切り抜きの削除不要化)、異なるアスペクト比の結合対応、ディレクトリのシンボリックリンク対応、円以外の通貨50種対応、results.txtの空白行許容対応、results.txtの上書き防止、ImageMagickとMoviePyのエラー回避、音のフェード対応) |
2024/03/10 | コメントからの候補抽出機能追加、該当コメント数表示対応、時刻の小数対応 | |
2024/02/27 | チャットダウンロードのみの実行機能追加 | |
2024/02/22 | チャット取得時に前半の一部しか取得されない問題の修正、ダウンロード再試行回数指定機能追加、ダウンロード失敗時のエラー回避 | |
2024/02/20 | 初版として公開 | |
2024/02/11 | 簡易版完成、初試用 | |
2024/02/10 | 開発開始 |
使用したパッケージ
チャットと動画のダウンロード
チャットと動画のダウンロードにはyt-dlp
を使用しました。
このツールを使うと、YouTube(に限らず多くの動画配信プラットフォーム)の大体のデータを取得することができます。
aptやpip等のパッケージマネージャでインストールできるため扱いやすいです。もちろんGitから取得しても良いです。
例えば、YouTubeのあるチャンネル内の動画すべてのチャットとコメントを取得するには、以下のコマンドを一行実行するだけで実現できます。
yt-dlp --skip-download --write-subs --write-comments -o "%(upload_date)s[%(id)s]" チャンネルURL
--write-subs
でライブチャットを取得、--write-comments
でコメントを取得、--skip-download
で動画のダウンロードをスキップ、-o
で出力名のパターンを指定しています。
動画をダウンロードしたい場合は--skip-download
をつけなければ良いです。
このように、コマンド一行で非常に簡単にデータを取得できるようになっています。
オプションも多数用意されているため、カスタマイズ性が高い点も魅力です。
因みに、pipでインストールした場合はpythonコード内のメソッドからも実行可能です。
YouTube Data APIについて
もともとチャットの取得には、Googleが提供するYouTube Data APIを使う予定でした。
しかし、このAPIは現在配信中のライブのチャットを取得することはできますが、過去のアーカイブには非対応でした。
そういう訳で、この方法では動画のURLリストを取得するくらいしかできず、チャットの取得には別の手段をとる必要があったため断念しました。(クローラを用意して実際にページにアクセスするなど)
動画編集
動画編集にはmoviepy
を使用しました。
こちらはよく使われている安定のパッケージだと思います。
動画編集に必要な機能をほとんど備えています。(バグがとても多いですが…)
例えば、今回使用している動画の切り抜きと結合は、以下のように非常に分かりやすい記述で実現できます。
切り抜き
video.subclip(sec_begin, sec_end)
結合
concatenate_videoclips(list_video)
ツール紹介: コード、使い方、実行例
ここからは、作成したツールを使用した実際の切り抜き動画の作成例を紹介し、ソースコードの解説をします。
紹介するコードはGitHub上にも公開しています。
(本記事では、可読性と再現性の担保のため、極力ファイル分割やimportを使用しないようにしています。その結果、重複するコードが何度か出てきますがご了承ください。詳細はREADMEをご確認ください)
できること
まずは、できることを簡潔に説明します。
本ツールは、特定のチャンネル内から特定のシーンを抽出し、それらを結合したまとめ動画を作ります。
以下の手順で実施します。
- チャットの取得、抽出、クラスタリング
コマンド1行を実行し、切り抜き箇所の候補を抽出します。 - 実際にYouTubeで確認
上記の候補を実際にYouTubeで確認し、切り抜き開始時刻と終了時刻を正確に指定します。ここは手動で作業する必要があります。 - 動画ダウンロード、動画編集
コマンド1行を実行し、上記の手動で編集した切り抜き箇所をダウンロードし切り抜きを結合した動画を生成します。
以上で切り抜き動画が完成します。
詳しくは後述しますが、このような動画が生成されます。
手動で確認する部分はチャンネルや切り抜きの期間によっては大変になりますが、その他は自動化されるようにしました。
以下では実行例の紹介と、ソースコードの解説をします。
(具体的な使い方はREADMEに記載されているため省略します)
実行例
実行例としては、VTuberのくしゃみ切り抜き動画の生成過程を紹介します。
幸いなことに(?)、VTuber界隈には「くしゃみ助かる」という文化があるためチャットから判断しやすく、かつ頻度がテスト用に適しています。
例として、先程動画を紹介したRumiさんの切り抜きの様子を見てみましょう。
まずは、チャットの取得、抽出、クラスタリングをします。
実行コマンド
source src/chat.sh data/20241130_rumi UCswRX8mNNdn1fjRctZqzjgA "くしゃみ|クシャミ|噴嚏|嚔|くしゃたす|くしゃ民|Bless you|bless you|Bless u|bless u|BLESS YOU|BLESS U|blessyou|ぶれすゆ|ブレスユ" "助か|たすか|浴びた|あびた|tskr|TSKR|snee|Snee|kushami|((^(は|ハ|ひ|ヒ|へ|ヘ|べ|ベ|ふぇ|フェ|ぶぇ|ブェ)(っ*|ッ*)[きキくク][しシちチ])|(^[くク][しシちチ][ゅュょョ][んン]))|achoo|ACHOO|幫大忙了|くしゃる|くしゃかる" "花粉|大丈夫|お大事に|可愛|かわいい|かわよ|こより|ミュート|mute|MUTE|幫大忙|nice|NICE"
検索文字列(厳しめ)は条件を緩くし過ぎると候補の数が膨大になってしまうので、ある程度条件を厳しめにするのが良いでしょう。ひらがな、カタカナ、漢字、その他いくつかの関連語のorとしています。
(この文字列が投稿された周辺を切り抜き候補とします)
日付は指定していないので全期間の動画が対象です。
ダウンロードには数時間から数日程度かかります。(ここが最も時間のかかる処理だと思われます)
ただ、データ量はそれほど大きいわけではないため、ネットワークを圧迫する心配はありません。
(速度は約1~3Mbit/sです。実行例では6.6GB程度、ホロライブだと100GBを超える場合もありました)
実行後は以下のようなディレクトリ構成になります。
.
├── data
│ └── 20241130_rumi
│ ├── extract
│ │ ├── list_date_title.txt
│ │ ├── list_url.txt
│ │ └── results.txt
│ └── live_chat
│ ├── ...
│ ├── 20210806[7MUzOjbVjb4].info.json
│ ├── 20210806[7MUzOjbVjb4].live_chat.json
│ ├── ...
├── setup.sh
├── src
│ ├── auth
│ │ └── cookies.txt
│ ├── chat.sh
│ ├── clustering_chat.py
│ ├── download_clip.py
│ ├── download_chat.py
│ ├── edit_video.py
│ ├── font
│ │ └── MPLUSRounded1c-Regular.ttf
│ └── video.sh
└── venv
└── yt-dlp_moviepy
data/20241130_rumi/extract/results.txt
に候補のリストが保存されています。
実行例では、候補が約450箇所にクラスタリングされました。
この結果をもとに実際にYouTubeで確認し、results.txtを編集します。
results.txt 編集後 (一部抜粋)
https://youtu.be/JAzH7GpNiEg?t=12860s 35 564 3:34:19,28,30.1,32.4,38.1,44.5,50,17.1 3:35:27 2022/03/23 くしゃみ助かる くしゃみ助かる x 5 要是一個噴嚏能夠拿到一個SC rumi醬就能成為大富翁了 幫大忙了 助かる 助かる 助かる 幫大忙了:_rumiLove: 助かる 幫大忙了 :_rumiLove::_rumiLove::_rumiLove: 助かる 幫大忙拉 幫大忙了:_rumiLove: たすかる コリデール そそ 助かる 助かる 助かる 謝謝 幫大忙了 助かる 助かる 助かる 助かる:_rumiLove: たすかるけど大丈夫? 幫大忙了:_rumiLove::_rumiLove:
https://youtu.be/JAzH7GpNiEg?t=13030s 4 282 3:37:25 3:38:05 2022/03/23 一個噴嚏 加班1小時 怎麼樣? くしゃみ一つでスパチャもらえるのならrumiはもうお金持ちだねって
https://youtu.be/9llaxdAvvPw?t=10513s 22 846 2:55:13,16.9,18.4,21.4,24,27.3 2:55:39 2022/03/26 くしゃみ助かる 助かる 幫大忙了 助かる:_rumiLove: 幫大忙了 助かる 助かる 幫大忙 助かる 助かる 助かる tskr 助かる 助かる 助かる
https://youtu.be/9llaxdAvvPw?t=11533s 15 0 3:12:13,17.4,19.3 3:12:32 2022/03/26 噴嚏時間 [翻譯/CN] 要打噴嚏了 bless u 助かる 助かる 助かる 助かる 助かる 幫大忙了 助かる
開始時刻と終了時刻、注目時刻を編集しています。
手動作業が必要なのは、実際にYouTubeで確認するこの部分のみです。
次に、編集した内容をもとに、元動画のダウンロードと切り抜き動画の生成をします。
実行コマンド
source src/video.sh data/20241130_rumi
生成された動画は以下の通りです。
タイムスタンプは以下のように出力されます。
このタイムスタンプをYouTubeの概要欄に貼り付けると、チャプターが作成されます。
timestamp.txt (一部抜粋)
0:03:27 1. 2021/09/09 https://youtu.be/NsJjHgliG00?t=916s
0:03:34 2. 2021/09/11 https://youtu.be/o98Y7oNGIQw?t=7497s
0:04:02 3. 2021/10/07 https://youtu.be/YaGU7hZ5IZE?t=7868s
0:04:10 3-2. 2021/10/07 https://youtu.be/YaGU7hZ5IZE?t=7905s
実行後のディレクトリ構成は以下の通りです。
.
├── data
│ └── 20240506_tsukino
│ ├── clip
│ │ ├── 20210808[2L5XDGPhnJk]_02627000-02642000.mp4
│ │ ├── ...
│ ├── download
│ │ ├── 20210808[2L5XDGPhnJk].mp4
│ │ ├── ...
│ ├── dst
│ │ ├── clip.mp4
│ │ ├── timestamp.txt
│ │ ├── timestamp1.txt
│ │ ├── timestamp_day.txt
│ │ ├── timestamp_month.txt
│ │ └── timestamp_year.txt
│ ├── extract
│ │ ├── list_date_title.txt
│ │ ├── list_url.txt
│ │ └── results.txt
│ └── live_chat
│ ├── ...
│ ├── 20210806[7MUzOjbVjb4].info.json
│ ├── 20210806[7MUzOjbVjb4].live_chat.json
│ ├── ...
├── setup.sh
├── src
│ ├── auth
│ │ └── cookies.txt
│ ├── chat.sh
│ ├── clustering_chat.py
│ ├── download_clip.py
│ ├── download_chat.py
│ ├── edit_video.py
│ ├── font
│ │ └── MPLUSRounded1c-Regular.ttf
│ └── video.sh
└── venv
└── yt-dlp_moviepy
以上、実際にくしゃみ切り抜き動画を作成した例でした。
ソースコード解説
ここからは、ソースコードの処理を簡単に説明します。
チャットの取得、抽出、クラスタリング
チャットとコメントの取得はyt-dlpのコマンドで実行し、その後以下のpythonコードでチャットを1事象ごとにクラスタリングし、候補をリスト化して出力します。
chat.sh
#!/bin/sh
# 作業用ルートディレクトリで実行 source src/chat.sh arg1 arg2 [arg3 arg4 arg5 arg6 arg7]
# 引数(必須2個 + 任意5個 + オプション): 作成するディレクトリのパス, YouTubeのChannel ID, 検索文字列(厳しめ), 検索文字列(緩め), 検索文字列(カウント用), 取得開始日(YYYYMMDD), 取得終了日(YYYYMMDD), オプション(-f: results.txtを強制上書きする場合)
current_dir=`pwd`
source venv/yt-dlp_moviepy/bin/activate
mkdir "$1"
cd "$1"
mkdir live_chat extract
rm -f extract/list_url.txt
yt-dlp --flat-playlist --print-to-file "%(webpage_url)s" extract/list_url.txt "https://www.youtube.com/channel/$2"
python "${current_dir}"/src/download_chat.py "${@:3}"
python "${current_dir}"/src/clustering_chat.py "${@:3}"
cd "${current_dir}"
deactivate
このシェルスクリプトでは、yt-dlpコマンドで指定チャンネル内すべての動画のチャットとコメントを取得し、その後後述のpythonスクリプトを実行するところまで自動化します。
download_chat.py を表示
import sys # argv
import os # remove, path.isfile, path.dirname
import glob # glob, escape
import yt_dlp # YoutubeDL
def getListURL(path): # ファイルからURLのリストを取得
list_url = []
with open(path) as f:
for line in f:
if len(line) > 11:
list_url.append(line[:-1])
return list_url
def downloadChat(dir_dst, list_url, dateafter, datebefore, cookiefile): # チャットのダウンロード
MAX_RETRY_DOWNLOAD = 3 # ダウンロードに失敗した際の最大再試行回数
option = {
"outtmpl": dir_dst + "%(upload_date)s[%(id)s]", # 出力形式 投稿日[動画ID]
"skip_download": True, # 動画のダウンロードをスキップ
"writeinfojson": True, # infoファイルを出力
"getcomments": True, # コメントをinfoファイルに出力
"writesubtitles": True, # チャットを出力
"daterange": yt_dlp.utils.DateRange(dateafter, datebefore), # ダウンロード対象期間
"ignoreerrors": True, # エラーを無視して続行
"cookiefile": cookiefile,
}
count = 0
with yt_dlp.YoutubeDL(option) as ydl:
for url in list_url:
count += 1
print("download: " + str(count) + " / " + str(len(list_url)))
id = url[-11:]
if len(glob.glob(glob.escape(dir_dst) + "*" + glob.escape(id) + "*")) == 0: # 出力先ファイルが既に存在する場合はダウンロードしない
ydl.download([url])
retry_download = MAX_RETRY_DOWNLOAD
while retry_download > 0: # ダウンロード失敗時は再試行
list_partfile = glob.glob(glob.escape(dir_dst) + "*" + glob.escape(id) + "*.part*")
if len(list_partfile) <= 0:
break
for partfile in list_partfile: # ダウンロード途中のpartファイルを削除してから再ダウンロード
if os.path.isfile(partfile):
print("remove: " + partfile)
os.remove(partfile)
retry_download -= 1
ydl.download([url])
def execute(dir_dst, path_list_url, dateafter, datebefore, cookiefile):
list_url = getListURL(path_list_url)
downloadChat(dir_dst, list_url, dateafter, datebefore, cookiefile)
def main():
dateafter = "" # ダウンロード期間の最初の日付 YYYYMMDD
datebefore = "" # ダウンロード期間の最後の日付 YYYYMMDD
for arg in sys.argv[1:]:
if len(arg) == 8 and arg.isdecimal():
if dateafter == "":
dateafter = arg
elif datebefore == "":
datebefore = arg
if dateafter == "":
dateafter = "00010101"
if datebefore == "":
datebefore = "99991231"
execute("live_chat/", "extract/list_url.txt", dateafter, datebefore, os.path.dirname(__file__) + "/auth/cookies.txt")
if __name__ == "__main__":
main()
clustering_chat.py を表示
import sys # argv
import os # listdir, path.isfile
import re # search
import unicodedata # normalize
def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
begin = str.find(str_begin) + len(str_begin)
if begin < len(str_begin):
return ""
end = str[begin:].find(str_end) + begin
return str[begin:end]
def subStrEnd(str, str_begin, str_end): # 該当範囲の文字列を切り出し(終了文字列から検索)
end = str.find(str_end)
if end < 0:
return ""
begin = str[:end].rfind(str_begin) + len(str_begin)
return str[begin:end]
def getId(filename): # ファイル名からVideoIDを抽出
return subStrEnd(filename, "[", "]")
def getDate(filename): # ファイル名から投稿日を抽出
return subStrEnd(filename, "/", "[")
def getTitle(line): # 生データからタイトルを抽出
return subStrBegin(line, '"title": "', '"')
def getReleaseDate(line): # 生データから公開日を抽出
return subStrBegin(line, '"release_date": "', '"')
def getSecond(line): # 生データから時刻を抽出
str = subStrBegin(line, '"videoOffsetTimeMsec": "', '"')[:-3] # mili sec で記載されているため下3文字削る
if str == "":
return 0
return int(str)
def getText(line): # 生データからチャットを抽出
text = ""
list_str_begin_end = (('"text": "', '"'), ('"shortcuts": ["', '"'))
str_end = ''
while True: # チャットがいくつかに分割される場合があるためすべて抽出して結合
begin = len(line)
for str_begin_end in list_str_begin_end:
(str_begin, str_end_current) = str_begin_end
begin_current = line.find(str_begin) + len(str_begin)
if begin_current >= len(str_begin) and begin_current < begin:
begin = begin_current
str_end = str_end_current
if begin >= len(line):
break
end = line[begin:].find(str_end) + begin
text += line[begin:end]
line = line[end + len(str_end):]
return text
def exchangeToYen(amount): # 円に両替した場合の金額
YEN_PER = {"¥": 1, "$": 150, "€": 160, "₩": 0.1, "£": 190, "₱": 2.5, "₹": 1.8, "₫": 0.006, "₪": 41, "MYR": 32, "CA$": 110, "NT$": 4.7, "THB": 4, "HK$": 19, "A$": 97, "ARS": 0.17, "PLN": 37, "MX$": 8.7, "CLP": 0.15, "IDR": 0.01, "SGD": 110, "R$": 30, "ZAR": 7.9, "RON": 32, "NOK": 14, "NZ$": 90, "RUB": 1.6, "DKK": 22, "SEK": 14, "CHF": 170, "PEN": 40, "CRC": 0.3, "RSD": 1.4, "UYU": 3.8, "DOP": 2.5, "ISK": 1.1, "SAR": 40, "HUF": 40, "CZK": 6.4, "BGN": 82, "BYN": 45, "GTQ": 19, "BOB": 21, "PYG": 0.02, "TRY": 4.6, "COP": 0.038, "HRK": 20, "AED": 40, "KES": 1, "NIO": 4, "ден": 2.6}
# 為替レートは2024/03/10時点、有効数字2桁程度。未記載の通貨は0円で換算
amount_normalized = unicodedata.normalize("NFKC", amount).replace(" ", "").replace(",", "")
if amount_normalized == "":
return 0
for symbol in YEN_PER:
if amount_normalized[:len(symbol)] == symbol:
return float(amount_normalized[len(symbol):]) * YEN_PER[symbol]
return 0
def getYenSuperchat(line): # 生データからスーパーチャット金額を抽出
return exchangeToYen(subStrBegin(line, '"purchaseAmountText": {"simpleText": "', '"'))
def isInfo(filename): # コメントならTrue, チャットならFalseを返す
return subStrBegin(filename, "].", ".") == "info"
def isValidChat(line): # データが重複していないか確認
if line.find("addChatItemAction") < 0:
return False
return True
def containStr(text, query): # 文字列が含まれるか判定
if query == "":
return False
return re.search(query, text) != None
def getCommentList(line, query): # 生データから該当コメントを抽出してリスト化
if query == "":
return []
list_comment = []
str_begin = '"text": "'
str_end = '"'
while True: # 全コメントを抽出
begin = line.find(str_begin) + len(str_begin)
if begin < len(str_begin):
break
end = line[begin:].find(str_end) + begin
comment = line[begin:end]
if containStr(comment, query):
list_comment.append(comment)
line = line[end + len(str_end):]
return list_comment
def secondToTime(second_src): # 秒数(int)から時間表示(str)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
second = second_src % SECOND_PER_MINUTE
second_src = int((second_src - second) / SECOND_PER_MINUTE)
minute = second_src % MINUTE_PER_HOUR
second_src = int((second_src - minute) / MINUTE_PER_HOUR)
hour = second_src
return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)
def timeToDisplayTime(str): # 時間を表示用に変換(yyyyMMdd -> yyyy/MM/dd)
return str[0:4] + "/" + str[4:6] + "/" + str[6:8]
def clusteringChat(dir, list_query): # チャットを1事象ごとにクラスタリング
SEC_CLUSTERING = 90 # チャット間隔が90秒未満の場合、同じ事象に対するコメントだと判定(初期値)
SEC_PRE = 60 # lv0チャット1個目の何秒前からチャット数を数えるか
MAX_EXAMPLE_CHAT = 24 # 参考例として出力する該当チャット数の最大値
MAX_EXAMPLE_LV = 2 # 参考例として出力する該当チャットLvの種類数
MAX_EXAMPLE_COMMENT = 24 # 参考例として出力する該当コメント数の最大値
EVAL_LV = (1, 0.25) # 各クエリlvに該当するチャット1つあたりの影響度。SEC_PRE秒間に加算値が1以上の場合、該当箇所と判定
results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: チャット/コメント数, [n][5][m]: チャット/コメント例 (m <= 12), [n][6]: 投げ銭金額, [n][7]: 公開日
list_date_title = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: タイトル
NUM_QUERY_LV = 3 # クエリの種類数
query_lv = [""] * NUM_QUERY_LV # 検索条件厳しい順
for i in range(min(len(list_query), NUM_QUERY_LV)):
query_lv[i] = list_query[i]
for i in range(min(len(list_query), NUM_QUERY_LV), len(query_lv) - 1):
query_lv[i] = query_lv[len(list_query) - 1]
list_file = os.listdir(dir)
list_file.sort() # 日付順に並び替え 日付が同じ場合はID順
BUF_SIZE = 1000 # 下記リングバッファのサイズ
count_progress = 0 # 進捗表示用カウンタ
for filename in list_file:
count_progress += 1
print("\r" + "clustering: " + str(count_progress) + " / " + str(len(list_file)), end="")
buf_lv_count = [(0, 0, 0)] * BUF_SIZE # 該当チャットの秒数とクエリlvと金額を保持するリングバッファ
buf_lv_itr = 0 # 上記リングバッファのイテレータ
sec_end_same = 0 # 同じ事象だと判定される期限
release_date = "00000000"
with open(dir + filename) as f:
# 入力データは、1チャットにつき1行。コメントは全データが1行
id = getId(filename)
date = getDate(filename)
if date.isdecimal() and int(release_date) < int(date): # 投稿日 < 公開日 が成り立つように。ただし、infoファイルから読み込まれるはずなので、このif文の有無で結果は変わらないはず
release_date = date
if isInfo(filename): # コメントの場合、開始終了秒数を0として追加
for line in f:
list_date_title.append((id, date, getTitle(line)))
if getReleaseDate(line) != "":
release_date = getReleaseDate(line)
list_comment = getCommentList(line, query_lv[0])
if len(list_comment) > 0: # コメントに検索文字列が含まれていない場合は除外
results.append([id, date, 0, 0, len(list_comment), [list_comment[0:MAX_EXAMPLE_COMMENT]], 0, release_date])
else: # チャットの場合
for line in f:
if not isValidChat(line): # チャットが既存データと重複する場合は除外
continue
chat_text = getText(line)
yen = getYenSuperchat(line)
query_lv_current = NUM_QUERY_LV
for i in range(NUM_QUERY_LV):
if containStr(chat_text, query_lv[i]):
query_lv_current = i
break
if query_lv_current >= NUM_QUERY_LV and yen <= 0: # チャットに検索文字列が含まれていない場合は除外
continue
chat_second = getSecond(line)
is_same = len(results) > 0 and results[-1][0] == id and results[-1][3] > 0 and chat_second <= sec_end_same
# チャット間隔が SEC_CLUSTERING 秒未満の場合、同じ事象に対するコメントだと判定
if not is_same:
buf_lv_count[buf_lv_itr] = (chat_second, query_lv_current, yen) # バッファに現在のチャットを追加
buf_lv_itr = (buf_lv_itr + 1) % BUF_SIZE
count_pre = 0
yen_pre = 0
eval_pre = 0 # 該当チャット判定用評価値。1以上なら該当
for itr in range(buf_lv_itr - 1, buf_lv_itr - 1 - BUF_SIZE, -1): # 最新のチャットから遡る
if buf_lv_count[itr][0] <= max(0, chat_second - SEC_PRE):
break
count_pre += 1
yen_pre += buf_lv_count[itr][2]
if buf_lv_count[itr][1] < len(EVAL_LV):
eval_pre += EVAL_LV[buf_lv_count[itr][1]]
if eval_pre >= 1: # 該当チャットである場合
results.append([id, date, chat_second, chat_second, count_pre, [[] for i in range(MAX_EXAMPLE_LV)], yen_pre, release_date])
sec_end_same = 0
is_same = True
if is_same:
results[-1][3] = max(chat_second, results[-1][3])
results[-1][4] += 1
results[-1][6] += yen
if len(results[-1][5]) < MAX_EXAMPLE_CHAT and query_lv_current < MAX_EXAMPLE_LV: # チャット例を MAX_EXAMPLE_CHAT 個まで保持
results[-1][5][query_lv_current].append(chat_text)
sec_end_same += (chat_second + SEC_CLUSTERING - sec_end_same) * SEC_CLUSTERING / (SEC_CLUSTERING + (chat_second - results[-1][2]) * 4) # 期限の加算時間を徐々に小さくする
return results, list_date_title
def writeResults(results, path): # 結果を出力: URL チャット/コメント数 金額 開始秒数 終了秒数 投稿日 チャット例
DELIMITER = " " # 出力時の区切り文字
NEWLINE = "\n" # 改行文字
SEC_BUFFER = 15 # チャット1個目の何秒前から動画を確認するか
with open(path, "w") as f:
for (id, _, sec_begin, sec_end, count, list_list_text, yen, release_date) in results:
second = max(sec_begin - SEC_BUFFER, 0)
f.write("https://youtu.be/" + id + "?t=" + str(second) + "s" + DELIMITER)
f.write(str(count) + DELIMITER)
f.write(str(int(yen)) + DELIMITER)
f.write(secondToTime(sec_begin) + DELIMITER)
f.write(secondToTime(sec_end) + DELIMITER)
f.write(timeToDisplayTime(release_date))
for list_text in list_list_text:
f.write(DELIMITER)
for text in list_text:
f.write(text + DELIMITER)
f.write(NEWLINE)
def writeListDateTitle(list_date_title, path): # ID,日付,タイトルを出力
DELIMITER = " " # 出力時の区切り文字
NEWLINE = "\n" # 改行文字
with open(path, "w") as f:
for (id, date, title) in list_date_title:
f.write(id + DELIMITER)
f.write(date + DELIMITER)
f.write(title + NEWLINE)
def execute(dir_src, path_results, path_list_date_title, list_query, force):
if len(list_query) <= 0: # クエリ文字列がなければ終了
return
if os.path.isfile(path_results) and not force: # 既に出力先ファイルが存在し、オプションが指定されていないなら終了
print(path_results + " already exists.")
return
results, list_date_title = clusteringChat(dir_src, list_query)
writeResults(results, path_results)
writeListDateTitle(list_date_title, path_list_date_title)
print()
print("results: " + path_results)
print("list_date_title: " + path_list_date_title)
def main():
force = False # results.txtを強制的に上書きするか
list_query = []
for i in range(1, len(sys.argv)):
arg = sys.argv[i]
if arg == "-f" or arg == "-F" or arg == "--force":
force = True
elif not (len(arg) == 8 and arg.isdecimal()):
list_query.append(arg)
execute("live_chat/", "extract/results.txt", "extract/list_date_title.txt", list_query, force)
if __name__ == "__main__":
main()
まず、チャットやコメントを含むjsonファイルから、チャット内容や投稿時刻、スーパーチャット金額などを文字列判定で抽出します。
例えば、チャットテキストなら "text": "チャットテキスト"
というような具合です。
さらに、チャットとコメントのデータから正規表現で文字列検索し、該当チャットを抽出します。
該当チャットの間隔が90秒以内の場合、同じ事象に対するチャットであると判断しています。
正確には、事象発生からの経過時間に応じて、同じ事象と判定される間隔が短くなるようにしています。
最初の検索文字列(厳しめ)が投稿される直前の検索文字列(緩め)もカウントしたいので、リングバッファに直近のチャットを保持しておき、検索文字列(厳しめ)が発生した時点から少し前までの検索文字列(緩め)の数もカウントするようにしています。
余談ですが、上記の結果として出力されるresults.txt編集時、テキストエディタとしてVSCodeを利用すると、Ctrlを押しながらリンクをクリックすればブラウザで該当URLのページが開かれるので便利です。
(results.txtの動画URLは、検索文字列(厳しめ)が発生した時点の15秒前から再生されるようにしています)
動画ダウンロード、動画編集
ここからは、先程編集した結果をもとに切り抜き動画を生成します。
まず、後述のpythonスクリプトを自動化するシェルスクリプトです。
video.sh
#!/bin/sh
# 作業用ルートディレクトリで実行 source src/video.sh arg1 [option]
# 引数(必須1個 + オプション): 作成したディレクトリのパス, オプション (-a: 動画全編をダウンロードする場合, -d: ダウンロードした元動画を削除する場合)
current_dir=`pwd`
source venv/yt-dlp_moviepy/bin/activate
cd "$1"
mkdir download clip
python "${current_dir}"/src/download_clip.py ${@:2}
mkdir dst
python "${current_dir}"/src/edit_video.py
cd "${current_dir}"
deactivate
最初に、動画のダウンロードと切り抜きをします。
download_clip.py を表示
import sys # argv
import os # remove, path.isfile, path.dirname
import operator # itemgetter
import csv # reader
import glob # glob, escape
import yt_dlp # YoutubeDL
from moviepy import *
#from moviepy.config import change_settings
#change_settings({"FFMPEG_BINARY":"ffmpeg"}) # moviepyでnvencを呼び出せない問題の対応
def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
begin = str.find(str_begin) + len(str_begin)
if begin < len(str_begin):
return ""
end = str[begin:].find(str_end) + begin
return str[begin:end]
def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
DELIMITER = ":" # 時間表示の区切り文字
index_delimiter = str.find(DELIMITER)
hour = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
index_delimiter = str.find(DELIMITER)
minute = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
second = float(str)
return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second
def secondToTime(second_src): # 秒数(float)から時間表示(str)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
second_src = round(second_src) # 小数点以下は四捨五入
second = second_src % SECOND_PER_MINUTE
second_src = int((second_src - second) / SECOND_PER_MINUTE)
minute = second_src % MINUTE_PER_HOUR
second_src = int((second_src - minute) / MINUTE_PER_HOUR)
hour = second_src
return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)
def getVideoPath(filename): # 拡張子も含めた動画のパスを取得
list_extension = ("mp4", "webm", "mkv")
for extension in list_extension:
path = filename + "." + extension
if os.path.isfile(path):
return path
print("file not found: " + filename)
return ""
def getDictDateTitle(path): # ファイルから日付とタイトルの辞書を取得
if not os.path.isfile(path):
return {}
dict_date_title = {}
with open(path) as f:
for line in f:
dict_date_title[line[:11]] = (line[12:20], line[21:-1])
return dict_date_title
def getResults(path, dict_date_title): # ファイルから結果を取得
DELIMITER = " " # 区切り文字
results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数
count_clip = 0
count_memo = 0
comment_out = False
with open(path) as f:
reader = csv.reader(f, delimiter=DELIMITER)
for row in reader:
if len(row) > 0 and row[0] == "//":
continue
if len(row) > 0 and "" in row[0]:
comment_out = False
if comment_out: # コメントアウト中はすべて無視
continue
if len(row) < 6:
continue
if (row[0][:4] != "http" or "," in row[3]) and "http" in row[0]: # 未編集行は除外
id = subStrBegin(row[0], "youtu.be/", "?")
date = row[5]
if id in dict_date_title:
date = dict_date_title[id][0]
time_str = row[3] + "," # 開始時刻とカウント時刻
sec_begin = timeToSecond(time_str[:time_str.find(",")])
results.append((id, date, sec_begin, timeToSecond(row[4])))
time_list = [sec_begin]
time_str = time_str[time_str.find(",") + 1:]
if "," in time_str:
while len(time_str) > 1:
time_float = float(time_str[:time_str.find(",")])
time_hour = time_float // 10000
time_float = time_float - time_hour * 10000
time_minute = time_float // 100
time_float = time_float - time_minute * 100
time_second = time_float
is_hour_zero = False
is_minute_zero = False
if time_hour == 0:
time_hour = time_list[-1] // 3600
is_hour_zero = True
if time_minute == 0:
is_minute_zero = True
time_minute = (time_list[-1] - time_hour * 3600) // 60
time_total_second = time_hour * 3600 + time_minute * 60 + time_second
if time_total_second < time_list[-1]:
if is_minute_zero:
time_minute += 1
elif is_hour_zero:
time_hour += 1
else:
print("Incorrect time: " + date + "[" + id + "]: " + time_list)
time_total_second = time_hour * 3600 + time_minute * 60 + time_second
time_list.append(time_total_second)
time_str = time_str[time_str.find(",") + 1:]
if timeToSecond(row[4]) <= time_list[-1]:
print("Incorrect time: " + date + "[" + id + "]")
time_list = [a - b for a, b in zip(time_list, [sec_begin] * len(time_list))]
head = row[0][:row[0].find("http")] # 行頭のメモ
head_count = -1
if head.isdecimal():
head_count = int(head)
count_clip += 1
if head_count >= 0: # 行頭のメモカウント機能
count_memo += head_count
else:
count_memo += len(time_list) - 1
print("count clip: " + str(count_clip))
if count_memo > 0:
print("count memo: " + str(count_memo))
return results
def printDurationFromResults(results):
duration_sum = 0
print("[", end="")
for (_, _, sec_begin, sec_end) in results:
print(secondToTime(sec_end - sec_begin), end=", ")
duration_sum += sec_end - sec_begin
print("]")
print("total: " + secondToTime(duration_sum))
def downloadAllAndClip(results, dir_download, dir_clip, remove_original, cookiefile): # 各動画のダウンロードと切り抜き
option = {
"outtmpl": dir_download + "%(upload_date)s[%(id)s].%(ext)s", # 出力形式 投稿日[動画ID].mp4
#"format": "(bv*[vcodec~='^((he|a)vc|h26[45])']+ba) / (bv*+ba/b)", # Download the best video with either h264 or h265 codec, or the best video if there is no such video
#"format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b", # Download the best mp4 video available, or the best video if no mp4 available
"ignoreerrors": True, # エラーを無視して続行
"cookiefile": cookiefile,
}
MAX_RETRY_DOWNLOAD = 6 # ダウンロードに失敗した際の最大再試行回数
results.sort(key=operator.itemgetter(1, 0, 2, 3)) # 日付順で、動画ごとにすべての切り抜きを作成
len_results = len(results)
with yt_dlp.YoutubeDL(option) as ydl:
for i in range(len_results):
(id, date, sec_begin, sec_end) = results[i]
url = "https://www.youtube.com/watch?v=" + id
filename = date + "[" + id + "]"
filename_download = dir_download + filename
path_download = ""
str_sec_begin = str(int(sec_begin * 1000)).zfill(8)
str_sec_end = str(int(sec_end * 1000)).zfill(8)
path_clip = dir_clip + filename + "_" + str_sec_begin + "-" + str_sec_end + ".mp4"
print(str(i) + "/" + str(len(results)) + ": " + path_clip)
if not os.path.isfile(path_clip): # 出力先ファイルが既に存在する場合は動画生成しない
retry_download = MAX_RETRY_DOWNLOAD
while retry_download > 0:
path_download = getVideoPath(filename_download)
if path_download != "": # 存在しない場合のみダウンロード
break
retry_download -= 1
ydl.download([url])
if path_download != "": # ダウンロードに失敗した場合はスキップ
if sec_begin < sec_end: # 時刻指定に誤りがある場合はスキップ
videoclip = VideoFileClip(path_download).subclipped(sec_begin, sec_end)
videoclip.write_videofile(path_clip, codec="mpeg4", bitrate="1000000000")
if remove_original and (i == len_results - 1 or id != results[i + 1][0]): # 用済みになった動画は、オプションがTrueなら削除
if os.path.isfile(path_download):
os.remove(path_download)
def downloadAllAudioAndClip(results, dir_download, dir_clip, remove_original, cookiefile): # すべての音声と切り抜き部分のみの映像をダウンロード
MAX_RETRY_DOWNLOAD = 6 # ダウンロードに失敗した際の最大再試行回数
SEC_INTERVAL_BEGIN_DOWNLOAD = 12 # 切り抜き前に指定秒数を追加した部分をダウンロード
SEC_INTERVAL_END_DOWNLOAD = 4 # 切り抜き後に指定秒数を追加した部分をダウンロード
SEC_INTERVAL_BEGIN_CLIP = 4 # 前に指定秒数以上が存在するダウンロード動画から切り抜きを作成
SEC_INTERVAL_END_CLIP = 0.1 # 後に指定秒数以上が存在するダウンロード動画から切り抜きを作成
results.sort(key=operator.itemgetter(1, 0, 2, 3)) # 日付順にダウンロード
len_results = len(results)
for i in range(len_results):
(id, date, sec_begin, sec_end) = results[i]
if sec_begin >= sec_end: # 時刻指定に誤りがある場合はスキップ
continue
url = "https://www.youtube.com/watch?v=" + id
filename = date + "[" + id + "]"
filename_download_pre = dir_download + filename
path_download_video = ""
path_download_audio = ""
str_sec_begin = str(round(sec_begin * 1000)).zfill(8)
str_sec_end = str(round(sec_end * 1000)).zfill(8)
path_clip = dir_clip + filename + "_" + str_sec_begin + "-" + str_sec_end + ".mp4"
print(str(i + 1) + "/" + str(len(results)) + ": " + path_clip)
if not os.path.isfile(path_clip): # 出力先ファイルが既に存在する場合は動画生成しない
retry_download = MAX_RETRY_DOWNLOAD
sec_begin_download = 0
sec_end_download = 0
while retry_download > 0:
for downloaded in glob.glob(glob.escape(filename_download_pre) + "*"):
index_extension = downloaded.rfind(".")
index_id_end = downloaded[:index_extension].rfind("]")
if index_id_end + 1 == index_extension: # 動画が開始から終了まですべてダウンロードされている場合
sec_begin_download = 0
sec_end_download = 0
path_download_video = downloaded
path_download_audio = downloaded
break
elif "audio." in downloaded: # 音声が開始から終了まですべてダウンロードされている場合
path_download_audio = downloaded
if path_download_video != "" and path_download_audio != "":
break
else: # 切り抜き付近のみがダウンロードされている場合
index_sec_end = downloaded[:index_extension].rfind("-")
index_sec_begin = downloaded[:index_sec_end].rfind("_")
if downloaded[index_sec_end + 1:index_extension].isdecimal(): # partファイルの場合は無視
sec_begin_download_current = int(downloaded[index_sec_begin + 1:index_sec_end]) / 1000
sec_end_download_current = int(downloaded[index_sec_end + 1:index_extension]) / 1000
if (sec_begin_download_current <= sec_begin - SEC_INTERVAL_BEGIN_CLIP or sec_begin_download_current == 0) and sec_end_download_current >= sec_end + SEC_INTERVAL_END_CLIP: # 適するダウンロード動画が存在する場合
path_download_video = downloaded
sec_begin_download = sec_begin_download_current
sec_end_download = sec_end_download_current
if path_download_video != "" and path_download_audio != "":
break
if path_download_video != "" and path_download_audio != "":
break
retry_download -= 1
if path_download_video == "":
sec_begin_download = round(sec_begin - SEC_INTERVAL_BEGIN_DOWNLOAD)
if sec_begin_download < 0:
sec_begin_download = 0
sec_end_download = round(sec_end + SEC_INTERVAL_END_DOWNLOAD)
str_sec_begin_download = str(round(sec_begin_download * 1000)).zfill(8)
str_sec_end_download = str(round(sec_end_download * 1000)).zfill(8)
filename_download = dir_download + filename + "_" + str_sec_begin_download + "-" + str_sec_end_download
option = {
"outtmpl": filename_download + ".%(ext)s", # 出力形式 投稿日[動画ID]_開始ミリ秒-終了ミリ秒.mp4
#"format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b", # Download the best mp4 video available, or the best video if no mp4 available
"ignoreerrors": True, # エラーを無視して続行
"download_ranges": lambda info_dict, ydl: [{"start_time": sec_begin_download, "end_time": sec_end_download}],
"cookiefile": cookiefile
}
with yt_dlp.YoutubeDL(option) as ydl:
ydl.download([url])
if path_download_audio == "":
filename_download = dir_download + filename + "audio"
option = {
"outtmpl": filename_download + ".%(ext)s", # 出力形式 投稿日[動画ID]_開始ミリ秒-終了ミリ秒.mp4
"format": "ba", # 音声のみ
"ignoreerrors": True, # エラーを無視して続行
"cookiefile": cookiefile
}
with yt_dlp.YoutubeDL(option) as ydl:
ydl.download([url])
if path_download_video != "" and path_download_audio != "":
videoclip = VideoFileClip(path_download_video) # ダウンロードの開始終了秒数をyt-dlpが厳守しない(少し長めになる)ので、切り抜き位置を調整する。現在は、終了秒数が一致している想定で調整している
audioclip = AudioFileClip(path_download_audio).subclipped(sec_begin, sec_end)
if sec_end_download <= 0:
sec_end_download = videoclip.duration
sec_begin_clip = videoclip.duration - sec_end_download + sec_begin # = videoclip.duration - (sec_end_download - sec_begin_download) + (sec_begin - sec_begin_download)
sec_end_clip = videoclip.duration - sec_end_download + sec_end
subclip = videoclip.subclipped(sec_begin_clip, sec_end_clip).with_audio(audioclip) # 映像と音声を結合
subclip.write_videofile(path_clip, codec="mpeg4", bitrate="1000000000")
if remove_original and (i == len_results - 1 or id != results[i + 1][0]): # 用済みになった動画は、オプションがTrueなら削除
for downloaded in glob.glob(glob.escape(filename_download_pre) + "*"):
os.remove(downloaded)
def downloadOnlyClip(results, dir_download, dir_clip, remove_original, cookiefile): # 切り抜き部分のみをダウンロード (ToDo: 現在は音声ずれが確率的に発生するため使用しない)
MAX_RETRY_DOWNLOAD = 6 # ダウンロードに失敗した際の最大再試行回数
SEC_INTERVAL_BEGIN_DOWNLOAD = 2 # 切り抜き前に指定秒数を追加した部分をダウンロード
SEC_INTERVAL_END_DOWNLOAD = 2 # 切り抜き後に指定秒数を追加した部分をダウンロード
SEC_INTERVAL_BEGIN_CLIP = 0 # 前に指定秒数以上が存在するダウンロード動画から切り抜きを作成
SEC_INTERVAL_END_CLIP = 0 # 後に指定秒数以上が存在するダウンロード動画から切り抜きを作成
results.sort(key=operator.itemgetter(1, 0, 2, 3)) # 日付順にダウンロード
len_results = len(results)
for i in range(len_results):
(id, date, sec_begin, sec_end) = results[i]
if sec_begin >= sec_end: # 時刻指定に誤りがある場合はスキップ
continue
url = "https://www.youtube.com/watch?v=" + id
filename = date + "[" + id + "]"
filename_download_pre = dir_download + filename
path_download = ""
str_sec_begin = str(round(sec_begin * 1000)).zfill(8)
str_sec_end = str(round(sec_end * 1000)).zfill(8)
path_clip = dir_clip + filename + "_" + str_sec_begin + "-" + str_sec_end + ".mp4"
print(str(i) + "/" + str(len(results)) + ": " + path_clip)
if not os.path.isfile(path_clip): # 出力先ファイルが既に存在する場合は動画生成しない
retry_download = MAX_RETRY_DOWNLOAD
sec_begin_download = 0
sec_end_download = 0
while retry_download > 0:
for downloaded in glob.glob(glob.escape(filename_download_pre) + "*"):
index_extension = downloaded.rfind(".")
index_id_end = downloaded[:index_extension].rfind("]")
if index_id_end + 1 == index_extension: # 動画が開始から終了まですべてダウンロードされている場合
sec_begin_download = 0
sec_end_download = 0
path_download = downloaded
break
else: # 切り抜き付近のみがダウンロードされている場合
index_sec_end = downloaded[:index_extension].rfind("-")
index_sec_begin = downloaded[:index_sec_end].rfind("_")
if downloaded[index_sec_end + 1:index_extension].isdecimal(): # partファイルの場合は無視
sec_begin_download = int(downloaded[index_sec_begin + 1:index_sec_end]) / 1000
sec_end_download = int(downloaded[index_sec_end + 1:index_extension]) / 1000
if (sec_begin_download <= sec_begin - SEC_INTERVAL_BEGIN_CLIP or sec_begin_download == 0) and sec_end_download >= sec_end + SEC_INTERVAL_END_CLIP: # 適するダウンロード動画が存在する場合
path_download = downloaded
break
if path_download != "":
break
sec_begin_download = round(sec_begin - SEC_INTERVAL_BEGIN_DOWNLOAD)
if sec_begin_download < 0:
sec_begin_download = 0
sec_end_download = round(sec_end + SEC_INTERVAL_END_DOWNLOAD)
str_sec_begin_download = str(round(sec_begin_download * 1000)).zfill(8)
str_sec_end_download = str(round(sec_end_download * 1000)).zfill(8)
filename_download = dir_download + filename + "_" + str_sec_begin_download + "-" + str_sec_end_download
retry_download -= 1
option = {
"outtmpl": filename_download + ".%(ext)s", # 出力形式 投稿日[動画ID]_開始ミリ秒-終了ミリ秒.mp4
#"format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b", # Download the best mp4 video available, or the best video if no mp4 available
"ignoreerrors": True, # エラーを無視して続行
"download_ranges": lambda info_dict, ydl: [{"start_time": sec_begin_download, "end_time": sec_end_download}],
"cookiefile": cookiefile
}
with yt_dlp.YoutubeDL(option) as ydl:
ydl.download([url])
if path_download != "":
videoclip = VideoFileClip(path_download) # ダウンロードの開始終了秒数をyt-dlpが厳守しない(少し長めになる)ので、切り抜き位置を調整する。現在は、終了秒数が一致している想定で調整している
print(videoclip.duration, videoclip.audio.duration)
if sec_end_download <= 0:
sec_end_download = videoclip.duration
sec_begin_clip = videoclip.duration - sec_end_download + sec_begin # = videoclip.duration - (sec_end_download - sec_begin_download) + (sec_begin - sec_begin_download)
sec_end_clip = videoclip.duration - sec_end_download + sec_end
subclip = videoclip.subclipped(sec_begin_clip, sec_end_clip)
subclip.write_videofile(path_clip, codec="mpeg4", audio_codec="libvorbis", bitrate="1000000000")
if remove_original and (i == len_results - 1 or id != results[i + 1][0]): # 用済みになった動画は、オプションがTrueなら削除
for downloaded in glob.glob(glob.escape(filename_download_pre) + "*"):
os.remove(downloaded)
def execute(path_results, path_list_date_title, dir_download, dir_clip, download_all, remove_original, cookiefile):
dict_date_title = getDictDateTitle(path_list_date_title)
results = getResults(path_results, dict_date_title)
printDurationFromResults(results)
if download_all:
downloadAllAndClip(results, dir_download, dir_clip, remove_original, cookiefile)
else:
downloadAllAudioAndClip(results, dir_download, dir_clip, remove_original, cookiefile)
def main():
download_all = False # 全編をダウンロードするか
remove_original = False # ダウンロードした元動画を削除するか
for arg in sys.argv[1:]:
download_all = download_all or (arg == "-a" or arg == "-A" or arg == "--all")
remove_original = remove_original or (arg == "-d" or arg == "-D" or arg == "--delete")
execute("extract/results.txt", "extract/list_date_title.txt", "download/", "clip/", download_all, remove_original, os.path.dirname(__file__) + "/auth/cookies.txt")
if __name__ == "__main__":
main()
コマンドからでもダウンロードできるのですが、1動画ごとにダウンロード、切り抜き作成、元動画削除(オプション)をまとめて実行したかったため、pythonスクリプト内でダウンロードすることにしました。
切り抜き周辺のみのダウンロード機能と元動画削除機能をつけることで、大量の切り抜き箇所があったとしてもストレージを圧迫しないようにしています。
動画完成後に切り抜き時刻を変更した場合でも、変更箇所のみ切り抜きし直すようにしました。
切り抜き前後に数秒追加した部分をダウンロードするので、切り抜き開始終了時刻を少し編集しても再ダウンロード不要です。
処理としては、切り抜き部分を完全に含むダウンロード動画が存在するまでダウンロードを繰り返しています。
条件を満たす元動画が存在する状態になったら、該当部分の切り抜き動画を生成します。
この処理を、results.txtで編集された行すべてに対して実施します。
次に、上記で生成された各切り抜きを結合します。
edit_video.py を表示
import sys # argv
import os # remove, rename, path.isfile, path.dirname
import gc # collect
import csv # reader
import glob # glob, escape
import numpy # array, argmin, abs
from moviepy import *
#from moviepy.config import change_settings
#change_settings({"FFMPEG_BINARY":"ffmpeg"}) # moviepyでnvencを呼び出せない問題の対応
def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
begin = str.find(str_begin) + len(str_begin)
if begin < len(str_begin):
return ""
end = str[begin:].find(str_end) + begin
return str[begin:end]
def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
DELIMITER = ":" # 時間表示の区切り文字
index_delimiter = str.find(DELIMITER)
hour = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
index_delimiter = str.find(DELIMITER)
minute = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
second = float(str)
return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second
def secondToTime(second_src): # 秒数(float)から時間表示(str)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
second_src = round(second_src) # 小数点以下は四捨五入
second = second_src % SECOND_PER_MINUTE
second_src = int((second_src - second) / SECOND_PER_MINUTE)
minute = second_src % MINUTE_PER_HOUR
second_src = int((second_src - minute) / MINUTE_PER_HOUR)
hour = second_src
return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)
def getVideoPath(filename): # 拡張子も含めた動画のパスを取得
list_extension = ("mp4", "webm", "mkv")
for extension in list_extension:
path = filename + "." + extension
if os.path.isfile(path):
return path
print("file not found: " + filename)
return ""
def getDictDateTitle(path): # ファイルから日付とタイトルの辞書を取得
if not os.path.isfile(path):
return {}
dict_date_title = {}
with open(path) as f:
for line in f:
dict_date_title[line[:11]] = (line[12:20], line[21:-1])
return dict_date_title
def getResults(path, dict_date_title): # ファイルから結果を取得
DELIMITER = " " # 区切り文字
results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: チャット数, [n][5]: コメント数, [n][6]: 金額, [n][7]: タイトル, [n][8]: 公開日(表示用文字列), [n][9]: カウンター関連情報, [n][10]: カテゴリ
dict_count_comment = {}
count_clip = 0
count_memo = 0
count_begin = 0 # 各切り抜きのカウンター開始値
display_counter = False
counter_title = ""
comment_out = False
category = ""
with open(path) as f:
reader = csv.reader(f, delimiter=DELIMITER)
for row in reader:
if len(row) > 0 and row[0] == "//":
continue
if len(row) > 0 and "" in row[0]:
comment_out = False
if comment_out: # コメントアウト中はすべて無視
continue
if len(row) > 0 and "")
if set_count_begin.isdecimal():
count_begin = int(set_count_begin)
elif len(row) > 0 and " " in row[0]:
display_counter = False
if len(row) > 0 and "")
if len(category) > 0 and category[-1] == "/":
category = category[:-1]
counter_title = category
if len(row) < 6:
continue
is_comment = row[4] == "0:00:00" # 終了時刻が0ならコメントと判定
id = subStrBegin(row[0], "youtu.be/", "?")
if is_comment:
dict_count_comment[id] = int(row[1])
if (row[0][:4] != "http" or "," in row[3]) and "http" in row[0]: # 未編集行は除外
count_chat = 0
if not is_comment:
count_chat = int(row[1])
date = row[5]
title = ""
if id in dict_date_title:
date = dict_date_title[id][0]
title = dict_date_title[id][1]
time_str = row[3] + "," # 開始時刻とカウント時刻
sec_begin = timeToSecond(time_str[:time_str.find(",")])
time_list = [sec_begin]
time_str = time_str[time_str.find(",") + 1:]
if "," in time_str:
while len(time_str) > 1:
time_float = float(time_str[:time_str.find(",")])
time_hour = time_float // 10000
time_float = time_float - time_hour * 10000
time_minute = time_float // 100
time_float = time_float - time_minute * 100
time_second = time_float
is_hour_zero = False
is_minute_zero = False
if time_hour == 0:
time_hour = time_list[-1] // 3600
is_hour_zero = True
if time_minute == 0:
is_minute_zero = True
time_minute = (time_list[-1] - time_hour * 3600) // 60
time_total_second = time_hour * 3600 + time_minute * 60 + time_second
if time_total_second < time_list[-1]:
if is_minute_zero:
time_minute += 1
elif is_hour_zero:
time_hour += 1
else:
print("Incorrect time: " + date + "[" + id + "]: " + time_list)
time_total_second = time_hour * 3600 + time_minute * 60 + time_second
time_list.append(time_total_second)
time_str = time_str[time_str.find(",") + 1:]
if timeToSecond(row[4]) <= time_list[-1]:
print("Incorrect time: " + date + "[" + id + "]")
time_list = [a - b for a, b in zip(time_list, [sec_begin] * len(time_list))]
head = row[0][:row[0].find("http")] # 行頭のメモ
head_count = -1
if head.isdecimal():
head_count = int(head)
results.append([id, date, sec_begin, timeToSecond(row[4]), count_chat, 0, int(row[2]), title, row[5], (display_counter, count_begin, head_count, time_list, counter_title), category])
count_clip += 1
if head_count >= 0: # 行頭のメモカウント機能
count_memo += head_count
count_begin += head_count
else:
count_memo += len(time_list) - 1
count_begin += len(time_list) - 1
print("count clip: " + str(count_clip))
if count_memo > 0:
print("count memo: " + str(count_memo))
for result in results: # コメント数をすべてに反映
if result[0] in dict_count_comment:
result[5] = dict_count_comment[result[0]]
return results
def getResolution(results, dir): # 全切り抜きのうち、該当数が最も多い解像度を取得
LIST_HEIGHT_16_9 = [144, 360, 480, 720, 1080, 1440, 2160, 4320] # 一般的な16:9の解像度(の高さ)一覧
list_count = [0] * len(LIST_HEIGHT_16_9)
list_resolution = []
list_duration = []
list_max_volume = []
id_current = ""
count_same_id = 0
for (id, date, sec_begin, sec_end, _, _, _, _, _, _, _) in results:
str_sec_begin = str(int(sec_begin * 1000)).zfill(8)
str_sec_end = str(int(sec_end * 1000)).zfill(8)
path = getVideoPath(dir + date + "[" + id + "]_" + str_sec_begin + "-" + str_sec_end) # glob.glob(glob.escape(dir) + "*" + glob.escape("[" + id + "]_" + str_sec_begin + "-" + str_sec_end) + "*")[0]
video = VideoFileClip(path)
index_nearest = numpy.argmin(numpy.abs(numpy.array(LIST_HEIGHT_16_9) - video.h))
list_count[index_nearest] += 1
list_resolution.append((video.w, video.h))
list_duration.append(video.duration)
list_max_volume.append(video.audio.max_volume())
if id == id_current:
count_same_id += 1
if list_max_volume[-1] < list_max_volume[-2]:
list_max_volume[-1] = list_max_volume[-2]
else:
for i in range(- count_same_id, -1):
list_max_volume[i] = list_max_volume[-1]
else:
id_current = id
count_same_id = 1
height = LIST_HEIGHT_16_9[list_count.index(max(list_count))]
width = int(height * 16 / 9)
list_target_resolution = []
for (width_original, height_original) in list_resolution:
height_target = height_original
width_target = width_original
if height_target > height or width_target > width or (height_target != height and width_target != width): # 共通解像度に一致するようサイズ変更
height_target = min(height, int(width * height_original / width_original))
width_target = min(width, int(height * width_original / height_original))
list_target_resolution.append((width_target, height_target))
return (width, height), list_target_resolution, list_duration, list_max_volume # MoviePy v1.Xまで、sizeは(x, y), target_resolutionは(y, x)だったが、v2.Xから(x, y)に統一
def generateTimestamp(results, list_duration): # タイムスタンプ用の文字列を生成 各切り抜きの開始時刻、投稿日、URL
NEWLINE = "\n" # 改行文字
SEC_CLUSTERING = 60 * 12 # 間隔が指定秒未満の場合、同じ事象に対する切り抜きだと判定。その場合、それらのタイムスタンプのカウントを同じにする (例: 1. 2-1. 2-2. 3. 4. ...)
timestamp = ""
timestamp1 = ""
timestamp_day = ""
timestamp_month = ""
timestamp_year = ""
sec_sum = 0
count_num = 0
count_sequence = 0
current_id = ""
current_sec_end = 0
current_category = ""
current_day = "0000/00/00"
current_month = "0000/00"
current_year = "0000"
for i in range(len(results)):
(id, date, sec_begin, sec_end, _, _, _, _, release_date, _, category) = results[i]
release_date = release_date.replace("\\n", " ").replace("\\s", " ") # 特殊文字を除去
url = "https://youtu.be/" + id + "?t=" + str(int(sec_begin)) + "s"
if category != current_category:
current_category = category
count_num = 0
current_id = ""
if id == current_id and sec_begin < current_sec_end + SEC_CLUSTERING:
count_sequence += 1
timestamp += secondToTime(sec_sum) + " " + category + str(count_num) + "-" + str(count_sequence) + ". " + release_date + " "
else:
count_num += 1
count_sequence = 1
current_id = id
timestamp += secondToTime(sec_sum) + " " + category + str(count_num) + ". " + release_date + " "
timestamp1 += secondToTime(sec_sum) + " " + category + str(count_num) + ". " + release_date + " "
timestamp1 += url + NEWLINE
if release_date != current_day:
current_day = release_date
timestamp_day += secondToTime(sec_sum) + " " + category + str(count_num) + ". " + current_day + " "
timestamp_day += url + NEWLINE
if release_date[:7] != current_month:
current_month = release_date[:7]
timestamp_month += secondToTime(sec_sum) + " " + category + str(count_num) + ". " + current_month + " "
timestamp_month += url + NEWLINE
if release_date[:4] != current_year:
current_year = release_date[:4]
timestamp_year += secondToTime(sec_sum) + " " + category + str(count_num) + ". " + current_year + " "
timestamp_year += url + NEWLINE
current_sec_end = sec_end
timestamp += url + NEWLINE
sec_sum += list_duration[i] # 実際の動画の duration と sec_end - sec_begin では誤差(前者が最大+0.05s程度)が発生し、累積すると数秒単位の誤差になってしまう。これを防ぐため、実際の動画の duration を使う
return timestamp, timestamp1, timestamp_day, timestamp_month, timestamp_year
def writeTimestamp(results, list_duration, path_dst_timestamp): # タイムスタンプをファイル出力
timestamp, timestamp1, timestamp_day, timestamp_month, timestamp_year = generateTimestamp(results, list_duration)
with open(path_dst_timestamp, "w") as f:
f.write(timestamp)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "1" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp1)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "_day" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp_day)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "_month" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp_month)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "_year" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp_year)
def displayText(release_date, count_chat, count_comment, yen): # 切り抜き動画中に表示する文字
DISPLAY_DATE = True # 公開日の表示オプション
DISPLAY_COUNT = True # 該当チャット数の表示オプション
DISPLAY_YEN = True # スーパーチャット金額(円)の表示オプション
JP_CHATS = "関連チャット数: "
JP_COMMENTS = "関連コメント数: "
JP_TIPPING = "スパチャ総額: ¥"
EN_CHATS = "Chats: "
EN_COMMENTS = "Comments: "
EN_TIPPING = "Tipping: ¥"
NEWLINE = "\n" # 改行文字
display_date = ""
if DISPLAY_DATE:
display_date += release_date.replace("\\n", "\n").replace("\\s", " ") # 改行を\nで、空白を\sで表現可能にする
display_text = ""
if DISPLAY_COUNT and count_chat > 0:
display_text += JP_CHATS + str(count_chat) + NEWLINE
if DISPLAY_COUNT and count_comment > 0:
display_text += JP_COMMENTS + str(count_comment) + NEWLINE
if DISPLAY_YEN and yen > 0:
display_text += JP_TIPPING + str(yen) + NEWLINE
if len(display_text) > 0:
display_text = display_text[:- len(NEWLINE)]
return display_date, display_text
def subClip(resolution, target_resolution, max_volume, title, display_date, display_text, counter, path, path_font="Courier"): # 各切り抜きのサイズを合わせ、文字とフェード効果を付与
SEC_FADEIN = 0.75 # フェードイン秒数
SEC_FADEOUT = 0.75 # フェードアウト秒数
SEC_AUDIO_FADEIN = 0.1 # 音のフェードイン秒数
SEC_AUDIO_FADEOUT = 0.1 # 音のフェードアウト秒数
COLOR_FONT = "#ffffff" # フォント色
COLOR_BACKGROUND = "#000000c0" # テキストの背景色
TEXT_MARGIN = (12, 12, 12, 12) # テキストの周りの余白 (left, top, right, bottom)
TEXT_X = 16 # テキストの左端座標(= 右端座標)
TEXT_Y = 8 # テキストの上端座標
(width, height) = resolution
(width_target, height_target) = target_resolution
videoclip = VideoFileClip(path, target_resolution=target_resolution).with_position((int((width - width_target) / 2), int((height - height_target) / 2))) # 指定の解像度で動画読み込み
fontsize = int(height / 15)
titlesize = int(fontsize * 3 / 8)
textsize = int(fontsize * 3 / 4)
countersize = int(fontsize * 3 / 2)
duration = videoclip.duration
if max_volume == 0:
max_volume = 1
list_clip = [videoclip.with_audio(videoclip.audio.subclipped(0, -0.05)).with_effects([afx.MultiplyVolume(1 / max_volume), afx.AudioFadeIn(SEC_AUDIO_FADEIN), afx.AudioFadeOut(SEC_AUDIO_FADEOUT)])] # 動画末尾の音声ノイズ対策(0, -0.05), 音量正規化(1 / max_volume)
current_x = TEXT_X
current_y = TEXT_Y
if len(title) > 0:
titleclip = TextClip(text=title, font=path_font, font_size=titlesize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND, margin=TEXT_MARGIN).with_position((current_x, current_y)).with_duration(duration)
current_y += titleclip.h
list_clip.append(titleclip)
if len(display_date) > 0:
dateclip = TextClip(text=display_date, font=path_font, font_size=fontsize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND, margin=TEXT_MARGIN, text_align="center").with_position((current_x, current_y)).with_duration(duration)
current_y += dateclip.h
list_clip.append(dateclip)
if len(display_text) > 0:
textclip = TextClip(text=display_text, font=path_font, font_size=textsize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND, margin=TEXT_MARGIN, text_align="left").with_position((current_x, current_y)).with_duration(duration)
current_y += textclip.h
list_clip.append(textclip)
current_x = width - TEXT_X
current_y = TEXT_Y
(display_counter, count_begin, _, time_list, counter_title) = counter
if counter_title != "":
counterheadclip = TextClip(text=counter_title, font=path_font, font_size=fontsize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND, margin=TEXT_MARGIN).with_duration(duration)
counterheadclip = counterheadclip.with_position((current_x - counterheadclip.w, current_y))
current_y += counterheadclip.h
list_clip.append(counterheadclip)
if display_counter:
time_list.append(duration)
count_current = count_begin
for i in range(len(time_list) - 1):
counterbodyclip = TextClip(text=(str(count_current)), font=path_font, font_size=countersize, size=(None, countersize), color=COLOR_FONT, bg_color=COLOR_BACKGROUND, margin=TEXT_MARGIN).with_duration(time_list[i + 1] - time_list[i]).with_start(time_list[i]) # .with_end(time_list[i + 1])
counterbodyclip = counterbodyclip.with_position((current_x - counterbodyclip.w, current_y))
list_clip.append(counterbodyclip)
count_current += 1
return CompositeVideoClip(clips=list_clip, size=resolution).with_effects([vfx.FadeIn(SEC_FADEIN), vfx.FadeOut(SEC_FADEOUT)])
def mergeClip(results, resolution, list_target_resolution, list_max_volume, dir, path_dst_video, remove_original, path_font): # 全切り抜きを結合
SEC_PER_PART = 60 * 30 # まとめて処理できる最大秒数。この秒数ごとの動画を生成し、最後に結合する
COUNT_PER_PART = 48 # まとめて処理できる最大動画数。この個数ごとの動画を生成し、最後に結合する
list_video = []
path_dst_video_pre = path_dst_video[:path_dst_video.rfind(".")] # 拡張子より前
path_dst_video_extension = path_dst_video[path_dst_video.rfind("."):]
part_current = 0
duration_current = 0
duration_sum = 0
count_current = 0
list_video_current = []
gc.collect()
for i in range(len(results)):
(id, date, sec_begin, sec_end, count_chat, count_comment, yen, title, release_date, counter, _) = results[i]
str_sec_begin = str(int(sec_begin * 1000)).zfill(8)
str_sec_end = str(int(sec_end * 1000)).zfill(8)
path = getVideoPath(dir + date + "[" + id + "]_" + str_sec_begin + "-" + str_sec_end) # glob.glob(glob.escape(dir) + "*" + glob.escape("[" + id + "]_" + str_sec_begin + "-" + str_sec_end) + "*")[0]
display_date, display_text = displayText(release_date, count_chat, count_comment, yen)
list_video.append(subClip(resolution, list_target_resolution[i], list_max_volume[i], title, display_date, display_text, counter, path, path_font))
duration_current += list_video[-1].duration
count_current += 1
if duration_current >= SEC_PER_PART or count_current >= COUNT_PER_PART or i == len(results) - 1: # まとめて処理できる許容量を超えた場合、現在のリスト内の動画をすべて結合して出力
print(secondToTime(duration_sum) + " -> " + secondToTime(duration_sum + duration_current))
print("[", end="")
for video in list_video:
print(secondToTime(video.duration), end=", ")
print("]")
path_current = path_dst_video_pre + "_part" + str(part_current) + path_dst_video_extension
if part_current >= 0: # ToDo: 変更を含むpartファイルのみ作成し直す。現状は、すべてのpartファイルが作成し直される
concatenate_videoclips(list_video).write_videofile(path_current, codec="mpeg4", bitrate="1000000000")
list_video.clear()
part_current += 1
duration_sum += duration_current
duration_current = 0
count_current = 0
gc.collect() # 各動画出力ごとにメモリ解放を明示的に指定
if part_current == 1: # 許容量に到達しなかった場合は結合処理が不要
os.rename(path_dst_video_pre + "_part0" + path_dst_video_extension, path_dst_video)
else:
list_part = []
for i in range(part_current):
path_current = path_dst_video_pre + "_part" + str(i) + path_dst_video_extension
videoclip = VideoFileClip(path_current)
list_part.append(videoclip.with_audio(videoclip.audio.subclipped(0, -0.1)).with_effects([afx.AudioFadeIn(0.1), afx.AudioFadeOut(0.1)])) # 動画末尾の音声ノイズ対策(0, -0.1) 0.05では消えなかったので0.1とした
concatenate_videoclips(list_part).write_videofile(path_dst_video, codec="mpeg4", bitrate="1000000000") # ToDo: codec="h264_nvenc", hevc_nvenc
list_part.clear()
if remove_original:
for i in range(part_current): # 一時ファイルをすべて削除
path_current = path_dst_video_pre + "_part" + str(i) + path_dst_video_extension
os.remove(path_current)
def execute(path_results, path_list_date_title, dir_video, path_dst_video, path_dst_timestamp, remove_original, dir_font=""):
path_font = ""
if dir_font != "":
path_font = glob.glob(glob.escape(dir_font) + "*")[0]
dict_date_title = getDictDateTitle(path_list_date_title)
results = getResults(path_results, dict_date_title)
resolution, list_target_resolution, list_duration, list_max_volume = getResolution(results, dir_video)
print("total: " + secondToTime(sum(list_duration)))
print("resolution: " + str(resolution))
writeTimestamp(results, list_duration, path_dst_timestamp)
print("timestamp: " + path_dst_timestamp)
mergeClip(results, resolution, list_target_resolution, list_max_volume, dir_video, path_dst_video, remove_original, path_font)
print("video: " + path_dst_video)
def main():
remove_original = False # 生成過程の動画を削除するか
for arg in sys.argv[1:]:
remove_original = remove_original or (arg == "-d" or arg == "-D" or arg == "--delete")
execute("extract/results.txt", "extract/list_date_title.txt", "clip/", "dst/clip.mp4", "dst/timestamp.txt", remove_original, os.path.dirname(__file__) + "/font/")
if __name__ == "__main__":
main()
すべての切り抜きで解像度を合わせる必要があるため、解像度の最頻値を最終的な切り抜きの解像度とします。
遊び心として、チャット数とスーパーチャット金額を動画内に表示する機能を付けています。
加えて、基本的な情報として、日付や動画タイトルも表示するようにしています。
また、見やすいように、結合部分にフェードを付けています。
長編の動画に対応するため、一定時間ごとに一度結合動画を作成し、最後にそれらをまとめて結合することで、処理中にメモリが不足しないようにしました。
(数時間くらいの動画なら作成できるはずです。より長い動画を考慮するなら、この階層数が変動するようにする必要がありますが、何度も結合処理をすると品質劣化につながるので、現状は2階層までの実装としています)
最後に、各切り抜きの長さからタイムスタンプを生成します。
YouTubeのチャプターに対応する形式で出力しています。
環境構築
環境構築についても簡単に触れておきます。
まずはツールを実行できる環境を整えます。
(yt-dlpとmoviepyを使えるようにします)
対応OSはLinux(私の場合はWindowsを使っているため、Ubuntu24.04LTS / WSL2)です。Windowsの場合は前述の通りWSL2を、Macの場合は少しシェルスクリプトを修正するかdockerコンテナ等でお使いください。
setup.sh
#!/bin/sh
# 作業用ルートディレクトリで実行 source setup.sh [arg1]
# 引数(任意1個): Pythonのバージョン(3.12等。デフォルト値は3)
python_version=${1:-3}
sudo apt update
sudo apt install -y "python${python_version}-venv" imagemagick ffmpeg
mkdir -p {src,data,venv}
rm -fr venv/yt-dlp_moviepy
"python${python_version}" -m venv venv/yt-dlp_moviepy
source venv/yt-dlp_moviepy/bin/activate
pip install -U pip
pip install -U yt-dlp
pip install -U moviepy
deactivate
source setup.sh
Pythonのバージョンを指定する場合 ↓
source setup.sh 3.12
pythonをインストールし、yt-dlpとmoviepyをpipでインストールしているだけです。仮想環境には標準機能のvenvを使っています。(imagemagickはmoviepyで必要です)
実行後は以下のようなディレクトリ構成になります。
.
├── data
├── setup.sh
├── src
└── venv
└── yt-dlp_moviepy
srcディレクトリはソースコード用、venvはPython仮想環境用、dataは本ツールで出力するデータ格納用です。
最後に、関連ファイル2点です。
1点目として、日本語を表示するため、日本語対応のフォントを配置しています。(日本語を表示しない場合は省略可能です)
配置場所は src/font/MPLUSRounded1c-Regular.ttf
です。
フォントファイルは、Google Fontsからダウンロードするか、上記GitHubリポジトリをクローンしてください。
もちろん、他の好きなフォントを使っても構いません。
2点目として、ログイン状態で処理できるよう、Cookieファイルを配置しています。
配置場所は src/auth/cookies.txt
です。
上記ファイルは空なので、ログインが必要な場合はブラウザ等から取得したCookieファイルで上書きします。ブラウザの拡張機能を使うと簡単に取得できます。
最後に
YouTubeライブ配信アーカイブの切り抜き動画作成を自動化するツールを作成したので紹介しました。
データのダウンロードは時間の無駄なので、YouTube上のデータを直接扱えるようなサービスが今後登場すると嬉しいですね。
今回作成したコード量で切り抜き動画作成の大部分を自動化できてしまう上、LLM等機械学習系のAI分野の技術が成熟してきているので、人間が切り抜き動画を作る時代はすぐに終わりを迎えるでしょう。
質問や指摘、要望等があれば、下のコメント欄にお願いします。
それでは、また。
コメント一覧
自動化ツールの作成と公開ありがとうございます。
当方全くプログラミングがわからず、色々調べてやってはみたのですがうまくいきません・・・。
もし可能でしたらご教授いただけると嬉しいです。
全く分からずに調べてやっているので的はずれな認識等あるかと思います・・・。
とりあえずGithubに公開していただいているデータをダウンロードし
srcフォルダのchat.shのファイルの
$1に保存先のディレクトリ
$2にチャンネルID
こちらを入力しました。しかしプログラムの実行?ができません。
というかやり方がわかりません・・・。
いろいろ調べたところ、Jupyter Labを使えばいいのかな?(多分違いますよね)と思って導入してみたのですがダメでした。
pythonの挙動もおかしいような気がしています。
管理者権限で開かないと正常に動作しません。
clustering_chat等のpythonファイルをクリックすると一瞬pythonが起動して、動作しません。
予想になってしまうんですが、pythonが問題なく動作していれば普通にプログラムも実行されるのかな?とは思ったりしています。
ただ現状なにが正しくできていて、なにが間違っているのかもわからないため、どこに原因があるのかがわかりません・・・。
これだけの情報で伝わるのかもわかりませんが、どの辺りに問題がありそうなのかだけでも教えていただけたら嬉しいです・・・。
プログラムの公開と詳細があるにも関わらず、こんなにも分からないものかと自分でも驚きました・・・。
お手数ですがご確認お願いします。
返信が遅くなり申し訳ありません。
リポジトリは常に更新しているので、今の最新版で試していたでけないでしょうか。
もしかしたら、実行する位置が違う可能性があります。ダウンロードしたらそのディレクトリに入り、そこで src/chat.sh などのコードを実行してください。
また、pythonが管理者権限でないと実行できないのはおかしいです。本ツールはコマンドラインから実行することを想定していますが、何かのツールに依存した実行方法をとっていませんか。
あと、トラブルシューティングは詳細な状況が分からなければ解決が難しいため、エラーが発生したならその内容や、今まで実行した内容をそのままコピペしていただきたいです。