こんにちは。
現在、YouTubeでは長時間の配信を多く見かけるようになり、その結果切り抜き動画の需要が高まっているように思います。
今回、切り抜き動画作成の大部分を自動化するツールを作成したので紹介します。
切り抜き箇所の候補を自動的に見つけたかったため、まずその部分を自動化し、ついでにその他動画編集部分なども自動化したという背景のもと作成されたツールです。
主に以下の処理を自動化しました。
- ライブ配信のアーカイブからチャットを取得する
- 特定のキーワードに該当するチャットを抽出する
- 該当の動画をダウンロードする
- 動画編集(切り抜き、結合)する
手動で実施する必要のある処理としては以下が残っています。
- 切り抜き候補を実際の動画で確認し、切り抜き開始時刻と終了時刻を正確に指定する
また、本記事で紹介するコードはすべてGitHub上にも公開しています。
更新履歴
日付 | ver. | 更新内容 |
---|---|---|
2024/08/18 | v1.4.0 | チャットデータのダウンロード時にもCookieファイルを参照するよう変更、動画全編ダウンロード時にCookieファイルが参照されない問題の修正、月ごとと年ごとのタイムスタンプ出力機能追加 |
2024/07/31 | v1.3.0 | 音量正規化機能追加、シンボリックリンクでフォントファイルが読み込まれない問題の修正、完成動画の一時ファイルを削除するよう変更、各動画出力後のメモリ解放を明示的に指定、タイムスタンプを先に出力するよう変更、動画合計時間と現在の動画処理時間の標準出力機能追加、results.txtの行頭に回数メモ機能追加、results.txtにコメントアウトの行を入れられるよう変更、動画全編読み込み時のエラー修正、Cookieファイルによりログイン状態で処理できるよう修正 |
2024/06/12 | v1.2.0 | 切り抜き周辺のみのダウンロード機能追加、最高品質がmp4以外の場合に対応、タイムスタンプの同一事象連番対応 |
2024/05/19 | v1.1.0 | タイムスタンプの誤差が累積する問題の修正、表示日付を投稿日から公開日に変更、公開日を任意の文字列に編集できるよう変更(list_date_title.txtの必須化) |
2024/05/10 | v1.0.0 | 大幅改修(sum_superchat.pyの削除(video.shの検索文字列不要化)、文字列検索のpython完結化、results.txtへの全情報集約化、クラスタリングアルゴリズム一部変更(sum_superchat.py削除による、該当チャット直前数秒間のチャット取得のためのリングバッファ追加)、タイトル表示機能追加、切り抜き並べ替え対応、各切り抜き動画ファイル名のmsec化(時刻編集時に以前の切り抜きの削除不要化)、異なるアスペクト比の結合対応、ディレクトリのシンボリックリンク対応、円以外の通貨50種対応、results.txtの空白行許容対応、results.txtの上書き防止、ImageMagickとMoviePyのエラー回避、音のフェード対応) |
2024/03/10 | コメントからの候補抽出機能追加、該当コメント数表示対応、時刻の小数対応 | |
2024/02/27 | チャットダウンロードのみの実行機能追加 | |
2024/02/22 | チャット取得時に前半の一部しか取得されない問題の修正、ダウンロード再試行回数指定機能追加、ダウンロード失敗時のエラー回避 | |
2024/02/20 | 初版として公開 | |
2024/02/11 | 簡易版完成、初試用 | |
2024/02/10 | 開発開始 |
使用したパッケージ
チャットと動画のダウンロード
チャットと動画のダウンロードにはyt-dlp
を使用しました。
このツールを使うと、YouTube(に限らず多くの動画配信プラットフォーム)の大体のデータを取得することができます。
aptやpip等のパッケージマネージャでインストールできるため扱いやすいです。もちろんGitから取得しても良いです。
例えば、YouTubeのあるチャンネル内の動画すべてのチャットとコメントを取得するには、以下のコマンドを一行実行するだけで実現できます。
yt-dlp --skip-download --write-subs --write-comments -o "%(upload_date)s[%(id)s]" チャンネルURL
--write-subs
でライブチャットを取得、--write-comments
でコメントを取得、--skip-download
で動画のダウンロードをスキップ、-o
で出力名のパターンを指定しています。
動画をダウンロードしたい場合は--skip-download
をつけなければ良いです。
このように、コマンド一行で非常に簡単にデータを取得できるようになっています。
オプションも多数用意されているため、カスタマイズ性が高い点も魅力です。
因みに、pipでインストールした場合はpythonコード内のメソッドからも実行可能です。
YouTube Data APIについて
もともとチャットの取得には、Googleが提供するYouTube Data APIを使う予定でした。
しかし、このAPIは現在配信中のライブのチャットを取得することはできますが、過去のアーカイブには非対応でした。
そういう訳で、この方法では動画のURLリストを取得するくらいしかできず、チャットの取得には別の手段をとる必要があったため断念しました。(クローラを用意して実際にページにアクセスするなど)
動画編集
動画編集にはmoviepy
を使用しました。
こちらはよく使われている安定のパッケージだと思います。
動画編集に必要な機能をほとんど備えています。(バグがとても多いですが…)
例えば、今回使用している動画の切り抜きと結合は、以下のように非常に分かりやすい記述で実現できます。
切り抜き
video.subclip(sec_begin, sec_end)
結合
concatenate_videoclips(list_video)
ツール紹介: コード、使い方、実行例
ここからは、作成したツールを使用した実際の切り抜き動画の作成例を紹介し、ソースコードの解説をします。
紹介するコードはGitHub上にも公開しています。
(本記事では、可読性と再現性の担保のため、極力ファイル分割やimportを使用しないようにしています。その結果、重複するコードが何度か出てきますがご了承ください。
通常版(最新版)は上記GitHubのmainブランチ、本記事とまったく同じコードはdiary-039ブランチにあります。詳細はREADMEをご確認ください)
できること
まずは、できることを簡潔に説明します。
本ツールは、特定のチャンネル内から特定のシーンを抽出し、それらを結合したまとめ動画を作ります。
以下の手順で実施します。
- チャットの取得、抽出、クラスタリング
コマンド1行を実行し、切り抜き箇所の候補を抽出します。 - 実際にYouTubeで確認
上記の候補を実際にYouTubeで確認し、切り抜き開始時刻と終了時刻を正確に指定します。ここは手動で作業する必要があります。 - 動画ダウンロード、動画編集
コマンド1行を実行し、上記の手動で編集した切り抜き箇所をダウンロードし切り抜きを結合した動画を生成します。
以上で切り抜き動画が完成します。
詳しくは後述しますが、こんな感じの動画が生成されます。
手動で確認する部分はチャンネルや切り抜きの期間によっては大変になりますが、その他は自動化されるようにしました。
以下では実行例の紹介と、ソースコードの解説をします。
(具体的な使い方はREADMEに記載されているため省略します)
実行例
実行例としては、VTuberのくしゃみ切り抜き動画の生成過程を紹介します。
幸いなことに(?)、VTuber界隈には「くしゃみ助かる」という文化があるためチャットから判断しやすく、かつ頻度がテスト用に適しています。
例として、にじさんじの「月ノ美兎」(委員長)さんの切り抜きの様子を見てみましょう。
まずは、チャットの取得、抽出、クラスタリングをします。
実行コマンド
source src/chat.sh data/20240506_tsukino UCD-miitqNY3nyukJ4Fnf4_A "くしゃみ|クシャミ|嚏|嚔|くしゃたす|くしゃ民|Bless you|bless you|ぶれすゆ|ブレスユ" "くしゃみ|クシャミ|嚏|嚔|助か|たすか|くしゃたす|くしゃ民|浴びた|あびた|tskr|TSKR|Bless you|bless you|ぶれすゆ|ブレスユ|sneez|kushami|花粉|連|続|回|大丈夫|お大事に|可愛い|かわいい|かわよ|ミュート|((^(は|ハ|ひ|ヒ|へ|ヘ|べ|ベ|ふぇ|フェ|ぶぇ|ブェ)(っ*|ッ*)[きキくク][しシちチ])|(^[くク][しシちチ][ゅュょョ][んン]))|こより|achoo|ACHOO|幫大忙了"
検索文字列(厳しめ)は条件を緩くし過ぎると候補の数が膨大になってしまうので、ある程度条件を厳しめにするのが良いでしょう。ひらがな、カタカナ、漢字、その他いくつかの関連語のorとしています。
(この文字列が投稿された周辺を切り抜き候補とします)
検索文字列(緩め)は単にチャット数の集計用に使われるだけなので、集計に含めるパターンをある程度網羅的に含めました。
日付は指定していないので全期間の動画が対象です。
ダウンロードには1日程度かかったと思います。(ここが最も時間のかかる処理だと思われます)
ただ、データ量はそれほど大きいわけではないため、ネットワークを圧迫する心配はありません。(実行例では24GB程度なので、速度は約1~3Mbit/sです)
実行後は以下のようなディレクトリ構成になります。
.
├── data
│ └── 20240506_tsukino
│ ├── extract
│ │ ├── list_date_title.txt
│ │ └── results.txt
│ └── live_chat
│ ├── ...
│ ├── 20180314[TqOm7Lg1xCY].info.json
│ ├── 20180314[TqOm7Lg1xCY].live_chat.json
│ ├── ...
├── setup.sh
├── src
│ ├── auth
│ │ └── cookies.txt
│ ├── chat.sh
│ ├── clustering_chat.py
│ ├── download_clip.py
│ ├── edit_video.py
│ ├── font
│ │ └── MPLUSRounded1c-Regular.ttf
│ └── video.sh
└── venv
└── yt-dlp_moviepy
data/20240506_tsukino/extract/results.txt
に候補のリストが保存されています。
実行例では、候補が356箇所にクラスタリングされました。
この結果をもとに実際にYouTubeで確認し、results.txtを編集します。
results.txt 編集後 (17行目~22行目抜粋)
https://youtu.be/MbG5iQzXYvw?t=792s 496 3934 0:13:42 0:22:53 2019/01/28 くしゃみ民 くしゃ民に餌を与えないでください くしゃ民は号が深すぎるから来世には期待しないで
https://youtu.be/xXU8E-fOqKc?t=0s 16 0 0:00:00 0:00:00 2019/02/12 委員長くしゃみした後、寒さを共有したとかとっさに言えるとこプロだなと思いました! 委員長のくせにかわいいくしゃみするな 7:26 くしゃみたすかる くしゃみかわいいのやめろ💢💢💢💢💢💢💢💢💢好きになっちゃうだろ💢💢💢💢 くしゃみが可愛かったからこの月ノ美兎は多分ニセモノ 7:28 くしゃみかわいすぎるw くしゃみ民大歓喜コロンビアァ… 委員長のくしゃみわいの顔にしてほしい 委員長のくしゃみにがち恋しました 自分用 クシャミ 7:27 07:25 可愛いくしゃみ 永久保存版\n31:11 可愛い「はーいはーい」からのVtuber実績アピール 「くしゃみたすかる」ってコメント何すか 笑 0:35 開始\n7:28 くしゃみ(どのアマガミキャラよりかわいい)\nついに初期の配信で話していたこのゲームをやる時が来たか…しかも委員長対決!\nマウント取るの失敗してるし委員長らしさでは完敗してるけど裏があるのは共通点なのかな? 07:28 くしゃみ すげぇツバ飛びそうなくしゃみ こういうくしゃみ好き
1https://youtu.be/xXU8E-fOqKc?t=424s 369 0 0:07:25 0:07:39 2019/02/12 くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみ助かる うっわかわいいくしゃみ くしゃみたすかる くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみたすかる くしゃみたすかる くしゃみたすかる くしゃみたすかる くしゃみ助かる 中途半端なくしゃみたすかる くしゃみ民歓喜 くしゃみたすかる くしゃみ助かる そら寒空にいたらクシャミも出るわ かわいいくしゃみでたなぁ くしゃみ助かる くしゃみ頂きました
0https://youtu.be/xXU8E-fOqKc?t=424s 369 0 0:08:05 0:08:13 2019/02/12 くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみ助かる うっわかわいいくしゃみ くしゃみたすかる くしゃみ助かる くしゃみ助かる くしゃみ助かる くしゃみたすかる くしゃみたすかる くしゃみたすかる くしゃみたすかる くしゃみ助かる 中途半端なくしゃみたすかる くしゃみ民歓喜 くしゃみたすかる くしゃみ助かる そら寒空にいたらクシャミも出るわ かわいいくしゃみでたなぁ くしゃみ助かる くしゃみ頂きました
https://youtu.be/xXU8E-fOqKc?t=979s 94 0 0:16:49 0:23:25 2019/02/12 委員長くしゃみして
https://youtu.be/xXU8E-fOqKc?t=3129s 169 0 0:52:39 1:00:13 2019/02/12 委員長のくしゃみの方が可愛かったぞ
行頭に数字(くしゃみの回数)を追記している行が編集した行です。開始時刻と終了時刻を編集しています。
手動作業が必要なのは、実際にYouTubeで確認するこの部分のみです。
次に、編集した内容をもとに、元動画のダウンロードと切り抜き動画の生成をします。
実行コマンド
source src/video.sh data/20240506_tsukino
生成された動画は以下の通りです。
タイムスタンプは以下のように出力されます。
このタイムスタンプをYouTubeの概要欄に貼り付けると、チャプターが作成されます。
timestamp.txt (先頭4行)
0:00:00 1. 2018/09/01 https://youtu.be/yyoxSjERfUc?t=7978s
0:00:11 2. 2019/02/12 https://youtu.be/xXU8E-fOqKc?t=445s
0:00:25 3. 2019/02/12 https://youtu.be/xXU8E-fOqKc?t=485s
0:00:33 4. 2019/08/01 https://youtu.be/_ad-QD6NLOA?t=530s
実行後のディレクトリ構成は以下の通りです。
.
├── data
│ └── 20240506_tsukino
│ ├── clip
│ │ ├── 20180716[xqkuRNaMXjM]_04558000-04715000.mp4
│ │ ├── 20180901[yyoxSjERfUc]_07978000-07989000.mp4
│ │ ├── 20190212[xXU8E-fOqKc]_00445000-00459000.mp4
│ │ ├── 20190212[xXU8E-fOqKc]_00485000-00493000.mp4
│ │ ├── ...
│ ├── download
│ │ ├── 20180716[xqkuRNaMXjM].mp4
│ │ ├── ...
│ ├── dst
│ │ ├── clip.mp4
│ │ ├── timestamp.txt
│ │ ├── timestamp1.txt
│ │ ├── timestamp_month.txt
│ │ └── timestamp_year.txt
│ ├── extract
│ │ ├── list_date_title.txt
│ │ └── results.txt
│ └── live_chat
│ ├── ...
│ ├── 20180314[TqOm7Lg1xCY].info.json
│ ├── 20180314[TqOm7Lg1xCY].live_chat.json
│ ├── ...
├── setup.sh
├── src
│ ├── auth
│ │ └── cookies.txt
│ ├── chat.sh
│ ├── clustering_chat.py
│ ├── download_clip.py
│ ├── edit_video.py
│ ├── font
│ │ └── MPLUSRounded1c-Regular.ttf
│ └── video.sh
└── venv
└── yt-dlp_moviepy
以上、実際にくしゃみ切り抜き動画を作成した例でした。
ソースコード解説
ここからは、ソースコードの処理を簡単に説明します。
チャットの取得、抽出、クラスタリング
チャットとコメントの取得はyt-dlpのコマンドで実行し、その後以下のpythonコードでチャットを1事象ごとにクラスタリングし、候補をリスト化して出力します。
chat.sh
#!/bin/sh
# 作業用ルートディレクトリで実行 source src/chat.sh arg1 arg2 [arg3 arg4 arg5 arg6]
# 引数(必須2個 + 任意4個): 作成するディレクトリのパス, YouTubeのChannel ID, 検索文字列(厳しめ), 検索文字列(緩め), 取得開始日(YYYYMMDD), 取得終了日(YYYYMMDD)
current_dir=`pwd`
source venv/yt-dlp_moviepy/bin/activate
if [ ! -d "$1"/live_chat ]
then
mkdir -p "$1"/live_chat
cd "$1"/live_chat
if [ $# -ge 6 ]
then
yt-dlp --skip-download --write-subs --write-comments --cookies "${current_dir}"/src/auth/cookies.txt --dateafter $5 --datebefore $6 -o "%(upload_date)s[%(id)s]" "https://www.youtube.com/channel/$2"
elif [ $# == 5 ]
then
yt-dlp --skip-download --write-subs --write-comments --cookies "${current_dir}"/src/auth/cookies.txt --dateafter $5 -o "%(upload_date)s[%(id)s]" "https://www.youtube.com/channel/$2"
else
yt-dlp --skip-download --write-subs --write-comments --cookies "${current_dir}"/src/auth/cookies.txt -o "%(upload_date)s[%(id)s]" "https://www.youtube.com/channel/$2"
fi
cd "${current_dir}"
fi
if [ $# -ge 3 ] && [ ! -e "$1"/extract/results.txt ]
then
mkdir -p "$1"/extract
cd "$1"
if [ $# -ge 4 ]
then
python "${current_dir}"/src/clustering_chat.py "$3" "$4"
else
python "${current_dir}"/src/clustering_chat.py "$3"
fi
cd "${current_dir}"
fi
deactivate
このシェルスクリプトでは、yt-dlpコマンドで指定チャンネル内すべての動画のチャットとコメントを取得し、その後後述のpythonスクリプトを実行するところまで自動化します。
clustering_chat.py を表示
import sys # argv
import os # listdir
import re # search
import unicodedata # normalize
def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
begin = str.find(str_begin) + len(str_begin)
if begin < len(str_begin):
return ""
end = str[begin:].find(str_end) + begin
return str[begin:end]
def subStrEnd(str, str_begin, str_end): # 該当範囲の文字列を切り出し(終了文字列から検索)
end = str.find(str_end)
if end < 0:
return ""
begin = str[:end].rfind(str_begin) + len(str_begin)
return str[begin:end]
def getId(filename): # ファイル名からVideoIDを抽出
return subStrEnd(filename, "[", "]")
def getDate(filename): # ファイル名から投稿日を抽出
return subStrEnd(filename, "/", "[")
def getTitle(line): # 生データからタイトルを抽出
return subStrBegin(line, '"title": "', '"')
def getReleaseDate(line): # 生データから公開日を抽出
return subStrBegin(line, '"release_date": "', '"')
def getSecond(line): # 生データから時刻を抽出
str = subStrBegin(line, '"videoOffsetTimeMsec": "', '"')[:-3] # mili sec で記載されているため下3文字削る
if str == "":
return 0
return int(str)
def getText(line): # 生データからチャットを抽出
text = ""
str_begin = '"text": "'
str_end = '"'
while True: # チャットがいくつかに分割される場合があるためすべて抽出して結合
begin = line.find(str_begin) + len(str_begin)
if begin < len(str_begin):
break
end = line[begin:].find(str_end) + begin
text += line[begin:end]
line = line[end + len(str_end):]
return text
def exchangeToYen(amount): # 円に両替した場合の金額
YEN_PER = {"¥": 1, "$": 150, "€": 160, "₩": 0.1, "£": 190, "₱": 2.5, "₹": 1.8, "₫": 0.006, "₪": 41, "MYR": 32, "CA$": 110, "NT$": 4.7, "THB": 4, "HK$": 19, "A$": 97, "ARS": 0.17, "PLN": 37, "MX$": 8.7, "CLP": 0.15, "IDR": 0.01, "SGD": 110, "R$": 30, "ZAR": 7.9, "RON": 32, "NOK": 14, "NZ$": 90, "RUB": 1.6, "DKK": 22, "SEK": 14, "CHF": 170, "PEN": 40, "CRC": 0.3, "RSD": 1.4, "UYU": 3.8, "DOP": 2.5, "ISK": 1.1, "SAR": 40, "HUF": 40, "CZK": 6.4, "BGN": 82, "BYN": 45, "GTQ": 19, "BOB": 21, "PYG": 0.02, "TRY": 4.6, "COP": 0.038, "HRK": 20, "AED": 40, "KES": 1, "NIO": 4, "ден": 2.6}
# 為替レートは2024/03/10時点、有効数字2桁程度。未記載の通貨は0円で換算
amount_normalized = unicodedata.normalize("NFKC", amount).replace(" ", "").replace(",", "")
if amount_normalized == "":
return 0
for symbol in YEN_PER:
if amount_normalized[:len(symbol)] == symbol:
return float(amount_normalized[len(symbol):]) * YEN_PER[symbol]
return 0
def getYenSuperchat(line): # 生データからスーパーチャット金額を抽出
return exchangeToYen(subStrBegin(line, '"purchaseAmountText": {"simpleText": "', '"'))
def isInfo(filename): # コメントならTrue, チャットならFalseを返す
return subStrBegin(filename, "].", ".") == "info"
def isValidChat(line): # データが重複していないか確認
if line.find("addChatItemAction") < 0:
return False
return True
def containStr(text, query): # 文字列が含まれるか判定
return re.search(query, text) != None
def getCommentList(line, query): # 生データから該当コメントを抽出してリスト化
list_comment = []
str_begin = '"text": "'
str_end = '"'
while True: # 全コメントを抽出
begin = line.find(str_begin) + len(str_begin)
if begin < len(str_begin):
break
end = line[begin:].find(str_end) + begin
comment = line[begin:end]
if containStr(comment, query):
list_comment.append(comment)
line = line[end + len(str_end):]
return list_comment
def secondToTime(second_src): # 秒数(int)から時間表示(str)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
second = second_src % SECOND_PER_MINUTE
second_src = int((second_src - second) / SECOND_PER_MINUTE)
minute = second_src % MINUTE_PER_HOUR
second_src = int((second_src - minute) / MINUTE_PER_HOUR)
hour = second_src
return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)
def timeToDisplayTime(str): # 時間を表示用に変換(yyyyMMdd -> yyyy/MM/dd)
return str[0:4] + "/" + str[4:6] + "/" + str[6:8]
def clusteringChat(dir, list_query): # チャットを1事象ごとにクラスタリング
SEC_CLUSTERING = 90 # チャット間隔が90秒未満の場合、同じ事象に対するコメントだと判定(初期値)
SEC_PRE = 15 # lv0チャット1個目の何秒前からチャット数を数えるか
MAX_EXAMPLE_CHAT = 24 # 参考例として出力する該当チャット数の最大値
MAX_EXAMPLE_COMMENT = 24 # 参考例として出力する該当コメント数の最大値
results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: チャット/コメント数, [n][5][m]: チャット/コメント例 (m <= 12), [n][6]: 投げ銭金額, [n][7]: 公開日
list_date_title = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: タイトル
query_lv = [""] * 3 # 検索条件厳しい順
for i in range(len(list_query)):
query_lv[i] = list_query[i]
for i in range(len(list_query), len(query_lv) - 1):
query_lv[i] = query_lv[len(list_query) - 1]
list_file = os.listdir(dir)
list_file.sort() # 日付順に並び替え 日付が同じ場合はID順
BUF_SIZE = 1000 # 下記リングバッファのサイズ
for filename in list_file:
buf_lv1_count = [(0, 0)] * BUF_SIZE # query_lv[1] に該当するチャットの秒数と金額を保持するリングバッファ
buf_lv1_itr = 0 # 上記リングバッファのイテレータ
sec_end_same = 0 # 同じ事象だと判定される期限
release_date = "00000000"
with open(dir + filename) as f:
# 入力データは、1チャットにつき1行。コメントは全データが1行
id = getId(filename)
date = getDate(filename)
if date.isdecimal() and int(release_date) < int(date): # 投稿日 < 公開日 が成り立つように。ただし、infoファイルから読み込まれるはずなので、このif文の有無で結果は変わらないはず
release_date = date
if isInfo(filename): # コメントの場合、開始終了秒数を0として追加
for line in f:
list_date_title.append((id, date, getTitle(line)))
if getReleaseDate(line) != "":
release_date = getReleaseDate(line)
if query_lv[0] != "":
list_comment = getCommentList(line, query_lv[0])
if len(list_comment) > 0: # コメントに検索文字列が含まれていない場合は除外
results.append([id, date, 0, 0, len(list_comment), list_comment[0:MAX_EXAMPLE_COMMENT], 0, release_date])
else: # チャットの場合
for line in f:
if not isValidChat(line): # チャットが既存データと重複する場合は除外
continue
chat_text = getText(line)
yen = getYenSuperchat(line)
contain_lv1 = query_lv[0] != "" and containStr(chat_text, query_lv[1])
if not contain_lv1 and yen <= 0: # チャットに検索文字列が含まれていない場合は除外
continue
chat_second = getSecond(line)
is_same = len(results) > 0 and results[-1][0] == id and results[-1][3] > 0 and chat_second <= sec_end_same
contain_lv0 = query_lv[0] != "" and containStr(chat_text, query_lv[0])
if contain_lv0 and not is_same:
# チャット間隔が SEC_CLUSTERING 秒未満の場合、同じ事象に対するコメントだと判定
count_pre = 0 # 最初のlv0チャット以前のlv1チャット数 (SEC_PRE 秒前まで)
yen_pre = 0
for itr in range(buf_lv1_itr - 1, buf_lv1_itr - 1 - BUF_SIZE, -1):
if buf_lv1_count[itr][0] <= max(0, chat_second - SEC_PRE):
break
count_pre += 1
yen_pre += buf_lv1_count[itr][1]
results.append([id, date, chat_second, chat_second, count_pre, [], yen_pre, release_date])
sec_end_same = 0
is_same = True
if is_same:
results[-1][3] = max(chat_second, results[-1][3])
results[-1][4] += 1
results[-1][6] += yen
if len(results[-1][5]) < MAX_EXAMPLE_CHAT and contain_lv0: # チャット例を MAX_EXAMPLE_CHAT 個まで保持
results[-1][5].append(chat_text)
sec_end_same += (chat_second + SEC_CLUSTERING - sec_end_same) * SEC_CLUSTERING / (SEC_CLUSTERING + (chat_second - results[-1][2]) * 4) # 期限の加算時間を徐々に小さくする
else:
buf_lv1_count[buf_lv1_itr] = (chat_second, yen)
buf_lv1_itr = (buf_lv1_itr + 1) % BUF_SIZE
return results, list_date_title
def writeResults(results, path): # 結果を出力: URL チャット/コメント数 金額 開始秒数 終了秒数 投稿日 チャット例
DELIMITER = " " # 出力時の区切り文字
NEWLINE = "\n" # 改行文字
SEC_BUFFER = 30 # チャット1個目の何秒前から動画を確認するか
with open(path, "w") as f:
for (id, _, sec_begin, sec_end, count, list_text, yen, release_date) in results:
second = max(sec_begin - SEC_BUFFER, 0)
f.write("https://youtu.be/" + id + "?t=" + str(second) + "s" + DELIMITER)
f.write(str(count) + DELIMITER)
f.write(str(int(yen)) + DELIMITER)
f.write(secondToTime(sec_begin) + DELIMITER)
f.write(secondToTime(sec_end) + DELIMITER)
f.write(timeToDisplayTime(release_date) + DELIMITER)
for text in list_text:
f.write(text + DELIMITER)
f.write(NEWLINE)
def writeListDateTitle(list_date_title, path): # ID,日付,タイトルを出力
DELIMITER = " " # 出力時の区切り文字
NEWLINE = "\n" # 改行文字
with open(path, "w") as f:
for (id, date, title) in list_date_title:
f.write(id + DELIMITER)
f.write(date + DELIMITER)
f.write(title + NEWLINE)
def execute(dir_src, path_results, path_list_date_title, list_query):
results, list_date_title = clusteringChat(dir_src, list_query)
writeResults(results, path_results)
writeListDateTitle(list_date_title, path_list_date_title)
def main():
execute("live_chat/", "extract/results.txt", "extract/list_date_title.txt", sys.argv[1:])
if __name__ == "__main__":
main()
まず、チャットやコメントを含むjsonファイルから、チャット内容や投稿時刻、スーパーチャット金額などを文字列判定で抽出します。
例えば、チャットテキストなら "text": "チャットテキスト"
というような具合です。
さらに、チャットとコメントのデータから正規表現で文字列検索し、該当チャットを抽出します。
該当チャットの間隔が90秒以内の場合、同じ事象に対するチャットであると判断しています。
正確には、事象発生からの経過時間に応じて、同じ事象と判定される間隔が短くなるようにしています。
最初の検索文字列(厳しめ)が投稿される直前の検索文字列(緩め)もカウントしたいので、リングバッファに直近のチャットを保持しておき、検索文字列(厳しめ)が発生した時点から少し前までの検索文字列(緩め)の数もカウントするようにしています。
余談ですが、上記の結果として出力されるresults.txt編集時、テキストエディタとしてVSCodeを利用すると、Ctrlを押しながらリンクをクリックすればブラウザで該当URLのページが開かれるので便利です。
(results.txtの動画URLは、検索文字列(厳しめ)が発生した時点の30秒前から再生されるようにしています)
動画ダウンロード、動画編集
ここからは、先程編集した結果をもとに切り抜き動画を生成します。
まず、後述のpythonスクリプトを自動化するシェルスクリプトです。
video.sh
#!/bin/sh
# 作業用ルートディレクトリで実行 source src/video.sh arg1 [option]
# 引数(必須1個 + オプション): 作成したディレクトリのパス, オプション (-a: 動画全編をダウンロードする場合, -d: ダウンロードした元動画を削除する場合)
current_dir=`pwd`
source venv/yt-dlp_moviepy/bin/activate
cd "$1"
mkdir download clip
python "${current_dir}"/src/download_clip.py ${@:2}
mkdir dst
python "${current_dir}"/src/edit_video.py
cd "${current_dir}"
deactivate
最初に、動画のダウンロードと切り抜きをします。
download_clip.py を表示
import sys # argv
import os # remove, path.isfile
import operator # itemgetter
import csv # reader
import glob # glob, escape
import yt_dlp # YoutubeDL
from moviepy.editor import *
def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
begin = str.find(str_begin) + len(str_begin)
if begin < len(str_begin):
return ""
end = str[begin:].find(str_end) + begin
return str[begin:end]
def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
DELIMITER = ":" # 時間表示の区切り文字
index_delimiter = str.find(DELIMITER)
hour = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
index_delimiter = str.find(DELIMITER)
minute = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
second = float(str)
return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second
def secondToTime(second_src): # 秒数(float)から時間表示(str)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
second_src = round(second_src) # 小数点以下は四捨五入
second = second_src % SECOND_PER_MINUTE
second_src = int((second_src - second) / SECOND_PER_MINUTE)
minute = second_src % MINUTE_PER_HOUR
second_src = int((second_src - minute) / MINUTE_PER_HOUR)
hour = second_src
return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)
def getVideoPath(filename): # 拡張子も含めた動画のパスを取得
list_extension = ("mp4", "webm", "mkv")
for extension in list_extension:
path = filename + "." + extension
if os.path.isfile(path):
return path
print("file not found: " + filename)
return ""
def getDictDateTitle(path): # ファイルから日付とタイトルの辞書を取得
if not os.path.isfile(path):
return {}
dict_date_title = {}
with open(path) as f:
for line in f:
dict_date_title[line[:11]] = (line[12:20], line[21:-1])
return dict_date_title
def getResults(path, dict_date_title): # ファイルから結果を取得
DELIMITER = " " # 区切り文字
results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数
count_clip = 0
count_memo = 0
with open(path) as f:
reader = csv.reader(f, delimiter=DELIMITER)
for row in reader:
if len(row) < 6:
continue
if row[0][:4] != "http" and "http" in row[0]: # 行頭に変更がなければ除外
id = subStrBegin(row[0], "youtu.be/", "?")
date = row[5]
if id in dict_date_title:
date = dict_date_title[id][0]
results.append((id, date, timeToSecond(row[3]), timeToSecond(row[4])))
count_clip += 1
if row[0][:row[0].find("http")].isdecimal(): # 行頭のメモカウント機能
count_memo += int(row[0][:row[0].find("http")])
print("count clip: " + str(count_clip))
if count_memo > 0:
print("count memo: " + str(count_memo))
return results
def printDurationFromResults(results):
duration_sum = 0
print("[", end="")
for (_, _, sec_begin, sec_end) in results:
print(secondToTime(sec_end - sec_begin), end=", ")
duration_sum += sec_end - sec_begin
print("]")
print("total: " + secondToTime(duration_sum))
def downloadAllAndClip(results, dir_download, dir_clip, remove_original, cookiefile): # 各動画のダウンロードと切り抜き
option = {
"outtmpl": dir_download + "%(upload_date)s[%(id)s].%(ext)s", # 出力形式 投稿日[動画ID].mp4
#"format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b", # Download the best mp4 video available, or the best video if no mp4 available
"ignoreerrors": True, # エラーを無視して続行
"cookiefile": cookiefile,
}
MAX_RETRY_DOWNLOAD = 6 # ダウンロードに失敗した際の最大再試行回数
results.sort(key=operator.itemgetter(1, 0, 2, 3)) # 日付順で、動画ごとにすべての切り抜きを作成
len_results = len(results)
with yt_dlp.YoutubeDL(option) as ydl:
for i in range(len_results):
(id, date, sec_begin, sec_end) = results[i]
url = "https://www.youtube.com/watch?v=" + id
filename = date + "[" + id + "]"
filename_download = dir_download + filename
path_download = ""
str_sec_begin = str(int(sec_begin * 1000)).zfill(8)
str_sec_end = str(int(sec_end * 1000)).zfill(8)
path_clip = dir_clip + filename + "_" + str_sec_begin + "-" + str_sec_end + ".mp4"
if not os.path.isfile(path_clip): # 出力先ファイルが既に存在する場合は動画生成しない
retry_download = MAX_RETRY_DOWNLOAD
while retry_download > 0:
path_download = getVideoPath(filename_download)
if path_download != "": # 存在しない場合のみダウンロード
break
retry_download -= 1
ydl.download([url])
if path_download != "": # ダウンロードに失敗した場合はスキップ
if sec_begin < sec_end: # 時刻指定に誤りがある場合はスキップ
videoclip = VideoFileClip(path_download).subclip(sec_begin, sec_end)
videoclip.write_videofile(path_clip, codec="mpeg4", bitrate="1000000000")
if remove_original and (i == len_results - 1 or id != results[i + 1][0]): # 用済みになった動画は、オプションがTrueなら削除
if os.path.isfile(path_download):
os.remove(path_download)
def downloadOnlyClip(results, dir_download, dir_clip, remove_original, cookiefile): # 切り抜き部分のみをダウンロード
MAX_RETRY_DOWNLOAD = 6 # ダウンロードに失敗した際の最大再試行回数
SEC_INTERVAL_BEGIN_DOWNLOAD = 12 # 切り抜き前に指定秒数を追加した部分をダウンロード
SEC_INTERVAL_END_DOWNLOAD = 4 # 切り抜き後に指定秒数を追加した部分をダウンロード
SEC_INTERVAL_BEGIN_CLIP = 4 # 前に指定秒数以上が存在するダウンロード動画から切り抜きを作成
SEC_INTERVAL_END_CLIP = 0.1 # 後に指定秒数以上が存在するダウンロード動画から切り抜きを作成
results.sort(key=operator.itemgetter(1, 0, 2, 3)) # 日付順にダウンロード
len_results = len(results)
for i in range(len_results):
(id, date, sec_begin, sec_end) = results[i]
if sec_begin >= sec_end: # 時刻指定に誤りがある場合はスキップ
continue
url = "https://www.youtube.com/watch?v=" + id
filename = date + "[" + id + "]"
filename_download_pre = dir_download + filename
path_download = ""
str_sec_begin = str(round(sec_begin * 1000)).zfill(8)
str_sec_end = str(round(sec_end * 1000)).zfill(8)
path_clip = dir_clip + filename + "_" + str_sec_begin + "-" + str_sec_end + ".mp4"
if not os.path.isfile(path_clip): # 出力先ファイルが既に存在する場合は動画生成しない
retry_download = MAX_RETRY_DOWNLOAD
sec_begin_download = 0
sec_end_download = 0
while retry_download > 0:
for downloaded in glob.glob(glob.escape(filename_download_pre) + "*"):
index_extension = downloaded.rfind(".")
index_id_end = downloaded[:index_extension].rfind("]")
if index_id_end + 1 == index_extension: # 動画が開始から終了まですべてダウンロードされている場合
sec_begin_download = 0
sec_end_download = 0
path_download = downloaded
break
else: # 切り抜き付近のみがダウンロードされている場合
index_sec_end = downloaded[:index_extension].rfind("-")
index_sec_begin = downloaded[:index_sec_end].rfind("_")
if downloaded[index_sec_end + 1:index_extension].isdecimal(): # partファイルの場合は無視
sec_begin_download = int(downloaded[index_sec_begin + 1:index_sec_end]) / 1000
sec_end_download = int(downloaded[index_sec_end + 1:index_extension]) / 1000
if (sec_begin_download <= sec_begin - SEC_INTERVAL_BEGIN_CLIP or sec_begin_download == 0) and sec_end_download >= sec_end + SEC_INTERVAL_END_CLIP: # 適するダウンロード動画が存在する場合
path_download = downloaded
break
if path_download != "":
break
sec_begin_download = round(sec_begin - SEC_INTERVAL_BEGIN_DOWNLOAD)
if sec_begin_download < 0:
sec_begin_download = 0
sec_end_download = round(sec_end + SEC_INTERVAL_END_DOWNLOAD)
str_sec_begin_download = str(round(sec_begin_download * 1000)).zfill(8)
str_sec_end_download = str(round(sec_end_download * 1000)).zfill(8)
filename_download = dir_download + filename + "_" + str_sec_begin_download + "-" + str_sec_end_download
retry_download -= 1
option = {
"outtmpl": filename_download + ".%(ext)s", # 出力形式 投稿日[動画ID]_開始ミリ秒-終了ミリ秒.mp4
#"format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b", # Download the best mp4 video available, or the best video if no mp4 available
"ignoreerrors": True, # エラーを無視して続行
"download_ranges": lambda info_dict, ydl: [{"start_time": sec_begin_download, "end_time": sec_end_download}],
"cookiefile": cookiefile
}
with yt_dlp.YoutubeDL(option) as ydl:
ydl.download([url])
if path_download != "":
videoclip = VideoFileClip(path_download) # ダウンロードの開始終了秒数をyt-dlpが厳守しない(少し長めになる)ので、切り抜き位置を調整する。現在は、終了秒数が一致している想定で調整している
if sec_end_download <= 0:
sec_end_download = videoclip.duration
sec_begin_clip = videoclip.duration - sec_end_download + sec_begin # = videoclip.duration - (sec_end_download - sec_begin_download) + (sec_begin - sec_begin_download)
sec_end_clip = videoclip.duration - sec_end_download + sec_end
subclip = videoclip.subclip(sec_begin_clip, sec_end_clip)
subclip.write_videofile(path_clip, codec="mpeg4", bitrate="1000000000")
if remove_original and (i == len_results - 1 or id != results[i + 1][0]): # 用済みになった動画は、オプションがTrueなら削除
for downloaded in glob.glob(glob.escape(filename_download_pre) + "*"):
os.remove(downloaded)
def execute(path_results, path_list_date_title, dir_download, dir_clip, download_all, remove_original, cookiefile):
dict_date_title = getDictDateTitle(path_list_date_title)
results = getResults(path_results, dict_date_title)
printDurationFromResults(results)
if download_all:
downloadAllAndClip(results, dir_download, dir_clip, remove_original, cookiefile)
else:
downloadOnlyClip(results, dir_download, dir_clip, remove_original, cookiefile)
def main():
download_all = False # 全編をダウンロードするか
remove_original = False # ダウンロードした元動画を削除するか
for arg in sys.argv[1:]:
download_all = download_all or (arg == "-a" or arg == "-A" or arg == "--all")
remove_original = remove_original or (arg == "-d" or arg == "-D" or arg == "--delete")
execute("extract/results.txt", "extract/list_date_title.txt", "download/", "clip/", download_all, remove_original, os.path.dirname(__file__) + "/auth/cookies.txt")
if __name__ == "__main__":
main()
コマンドからでもダウンロードできるのですが、1動画ごとにダウンロード、切り抜き作成、元動画削除(オプション)をまとめて実行したかったため、pythonスクリプト内でダウンロードすることにしました。
切り抜き周辺のみのダウンロード機能と元動画削除機能をつけることで、大量の切り抜き箇所があったとしてもストレージを圧迫しないようにしています。
動画完成後に切り抜き時刻を変更した場合でも、変更箇所のみ切り抜きし直すようにしました。
切り抜き前後に数秒追加した部分をダウンロードするので、切り抜き開始終了時刻を少し編集しても再ダウンロード不要です。
処理としては、切り抜き部分を完全に含むダウンロード動画が存在するまでダウンロードを繰り返しています。
条件を満たす元動画が存在する状態になったら、該当部分の切り抜き動画を生成します。
この処理を、results.txtで編集された行すべてに対して実施します。
次に、上記で生成された各切り抜きを結合します。
edit_video.py を表示
import os # path.isfile, path.dirname, rename
import gc # collect
import csv # reader
import numpy # array, argmin, abs
from moviepy.editor import *
from moviepy.video.fx.resize import resize
def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
begin = str.find(str_begin) + len(str_begin)
if begin < len(str_begin):
return ""
end = str[begin:].find(str_end) + begin
return str[begin:end]
def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
DELIMITER = ":" # 時間表示の区切り文字
index_delimiter = str.find(DELIMITER)
hour = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
index_delimiter = str.find(DELIMITER)
minute = int(str[:index_delimiter])
str = str[index_delimiter + len(DELIMITER):]
second = float(str)
return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second
def secondToTime(second_src): # 秒数(float)から時間表示(str)に変換
SECOND_PER_MINUTE = 60
MINUTE_PER_HOUR = 60
second_src = round(second_src) # 小数点以下は四捨五入
second = second_src % SECOND_PER_MINUTE
second_src = int((second_src - second) / SECOND_PER_MINUTE)
minute = second_src % MINUTE_PER_HOUR
second_src = int((second_src - minute) / MINUTE_PER_HOUR)
hour = second_src
return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)
def getVideoPath(filename): # 拡張子も含めた動画のパスを取得
list_extension = ("mp4", "webm", "mkv")
for extension in list_extension:
path = filename + "." + extension
if os.path.isfile(path):
return path
print("file not found: " + filename)
return ""
def getDictDateTitle(path): # ファイルから日付とタイトルの辞書を取得
if not os.path.isfile(path):
return {}
dict_date_title = {}
with open(path) as f:
for line in f:
dict_date_title[line[:11]] = (line[12:20], line[21:-1])
return dict_date_title
def getResults(path, dict_date_title): # ファイルから結果を取得
DELIMITER = " " # 区切り文字
results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: チャット数, [n][5]: コメント数, [n][6]: 金額, [n][7]: タイトル, [n][8]: 公開日(表示用文字列)
dict_count_comment = {}
count_clip = 0
count_memo = 0
with open(path) as f:
reader = csv.reader(f, delimiter=DELIMITER)
for row in reader:
if len(row) < 6:
continue
is_comment = row[4] == "0:00:00" # 終了時刻が0ならコメントと判定
id = subStrBegin(row[0], "youtu.be/", "?")
if is_comment:
dict_count_comment[id] = int(row[1])
if row[0][:4] != "http" and "http" in row[0]: # 行頭に変更がなければ除外
count_chat = 0
if not is_comment:
count_chat = int(row[1])
date = row[5]
title = ""
if id in dict_date_title:
date = dict_date_title[id][0]
title = dict_date_title[id][1]
results.append([id, date, timeToSecond(row[3]), timeToSecond(row[4]), count_chat, 0, int(row[2]), title, row[5]])
count_clip += 1
if row[0][:row[0].find("http")].isdecimal(): # 行頭のメモカウント機能
count_memo += int(row[0][:row[0].find("http")])
print("count clip: " + str(count_clip))
if count_memo > 0:
print("count memo: " + str(count_memo))
for result in results: # コメント数をすべてに反映
if result[0] in dict_count_comment:
result[5] = dict_count_comment[result[0]]
return results
def getResolution(results, dir): # 全切り抜きのうち、該当数が最も多い解像度を取得
LIST_HEIGHT_16_9 = [144, 360, 480, 720, 1080, 1440, 2160, 4320] # 一般的な16:9の解像度(の高さ)一覧
list_count = [0] * len(LIST_HEIGHT_16_9)
list_resolution = []
list_duration = []
for (id, date, sec_begin, sec_end, _, _, _, _, _) in results:
str_sec_begin = str(int(sec_begin * 1000)).zfill(8)
str_sec_end = str(int(sec_end * 1000)).zfill(8)
path = getVideoPath(dir + date + "[" + id + "]_" + str_sec_begin + "-" + str_sec_end)
video = VideoFileClip(path)
index_nearest = numpy.argmin(numpy.abs(numpy.array(LIST_HEIGHT_16_9) - video.h))
list_count[index_nearest] += 1
list_resolution.append((video.h, video.w))
list_duration.append(video.duration)
height = LIST_HEIGHT_16_9[list_count.index(max(list_count))]
width = int(height * 16 / 9)
list_target_resolution = []
for (height_original, width_original) in list_resolution:
height_target = height_original
width_target = width_original
if height_target > height or width_target > width or (height_target != height and width_target != width): # 共通解像度に一致するようサイズ変更
height_target = min(height, int(width * height_original / width_original))
width_target = min(width, int(height * width_original / height_original))
list_target_resolution.append((height_target, width_target))
return (width, height), list_target_resolution, list_duration # sizeは(x, y), target_resolutionは(y, x)。不満はmoviepy開発陣へ
def generateTimestamp(results, list_duration): # タイムスタンプ用の文字列を生成 各切り抜きの開始時刻、投稿日、URL
NEWLINE = "\n" # 改行文字
SEC_CLUSTERING = 90 # 間隔が90秒未満の場合、同じ事象に対する切り抜きだと判定(初期値)。その場合、それらのタイムスタンプのカウントを同じにする (例: 1. 2-1. 2-2. 3. 4. ...)
timestamp = ""
timestamp1 = ""
timestamp_month = ""
timestamp_year = ""
len_results = len(results)
sec_sum = 0
count_num = 0
count_sequence = 0
current_month = "0000/00"
current_year = "0000"
for i in range(len_results):
(id, date, sec_begin, _, _, _, _, _, release_date) = results[i]
url = "https://youtu.be/" + id + "?t=" + str(int(sec_begin)) + "s"
if i > 0 and id == results[i - 1][0] and sec_begin < results[i - 1][3] + SEC_CLUSTERING:
count_sequence += 1
timestamp += secondToTime(sec_sum) + " " + str(count_num) + "-" + str(count_sequence) + ". " + release_date + " "
else:
count_num += 1
count_sequence = 1
timestamp += secondToTime(sec_sum) + " " + str(count_num) + ". " + release_date + " "
timestamp1 += secondToTime(sec_sum) + " " + str(count_num) + ". " + release_date + " "
timestamp1 += url + NEWLINE
if release_date[:7] != current_month:
current_month = release_date[:7]
timestamp_month += secondToTime(sec_sum) + " " + str(count_num) + ". " + current_month + " "
timestamp_month += url + NEWLINE
if release_date[:4] != current_year:
current_year = release_date[:4]
timestamp_year += secondToTime(sec_sum) + " " + str(count_num) + ". " + current_year + " "
timestamp_year += url + NEWLINE
timestamp += url + NEWLINE
sec_sum += list_duration[i] # 実際の動画の duration と sec_end - sec_begin では誤差(前者が最大+0.05s程度)が発生し、累積すると数秒単位の誤差になってしまう。これを防ぐため、実際の動画の duration を使う
return timestamp, timestamp1, timestamp_month, timestamp_year
def displayText(release_date, count_chat, count_comment, yen): # 切り抜き動画中に表示する文字
DISPLAY_DATE = True # 公開日の表示オプション
DISPLAY_COUNT = True # 該当チャット数の表示オプション
DISPLAY_YEN = True # スーパーチャット金額(円)の表示オプション
NEWLINE = "\n" # 改行文字
display_text = ""
if DISPLAY_DATE:
display_text += release_date + NEWLINE
if DISPLAY_COUNT and count_chat > 0:
display_text += "関連チャット数: " + str(count_chat) + NEWLINE
if DISPLAY_COUNT and count_comment > 0:
display_text += "関連コメント数: " + str(count_comment) + NEWLINE
if DISPLAY_YEN and yen > 0:
display_text += "スパチャ総額: ¥" + str(yen) + NEWLINE
if len(display_text) > 0:
display_text = display_text[:- len(NEWLINE)]
return display_text
def subClip(resolution, target_resolution, title, text, path, path_font=""): # 各切り抜きのサイズを合わせ、文字とフェード効果を付与
SEC_FADEIN = 1 # フェードイン秒数
SEC_FADEOUT = 1 # フェードアウト秒数
SEC_AUDIO_FADEIN = 0.2 # 音のフェードイン秒数
SEC_AUDIO_FADEOUT = 0.2 # 音のフェードアウト秒数
COLOR_FONT = "#ffffff" # フォント色
COLOR_BACKGROUND = "#000000c0" # テキストの背景色
(width, height) = resolution
(height_target, width_target) = target_resolution
videoclip = VideoFileClip(path, target_resolution=target_resolution).set_position((int((width - width_target) / 2), int((height - height_target) / 2))) # 指定の解像度で動画読み込み
fontsize = int(height / 20)
titlesize = int(fontsize / 2)
position_title = (16, 8)
duration = videoclip.duration
if path_font == "":
titleclip = TextClip(txt=title, fontsize=titlesize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND).set_position(position_title).set_duration(duration)
position_text = (position_title[0], position_title[1] + titleclip.h)
textclip = TextClip(txt=text, fontsize=fontsize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND).set_position(position_text).set_duration(duration)
else:
titleclip = TextClip(txt=title, font=path_font, fontsize=titlesize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND).set_position(position_title).set_duration(duration)
position_text = (position_title[0], position_title[1] + titleclip.h)
textclip = TextClip(txt=text, font=path_font, fontsize=fontsize, color=COLOR_FONT, bg_color=COLOR_BACKGROUND).set_position(position_text).set_duration(duration)
return CompositeVideoClip(clips=[videoclip.audio_normalize().audio_fadein(SEC_AUDIO_FADEIN).audio_fadeout(SEC_AUDIO_FADEOUT), titleclip, textclip], size=resolution).fadein(SEC_FADEIN).fadeout(SEC_FADEOUT)
def mergeClip(results, resolution, list_target_resolution, dir, path_dst_video, path_font): # 全切り抜きを結合
SEC_PER_PART = 60 * 24 # まとめて処理できる最大秒数。この秒数ごとの動画を生成し、最後に結合する
COUNT_PER_PART = 128 # まとめて処理できる最大動画数。この個数ごとの動画を生成し、最後に結合する
list_video = []
len_results = len(results)
path_dst_video_pre = path_dst_video[:path_dst_video.rfind(".")] # 拡張子より前
path_dst_video_extension = path_dst_video[path_dst_video.rfind("."):]
part_current = 0
duration_current = 0
duration_sum = 0
count_current = 0
for i in range(len_results):
(id, date, sec_begin, sec_end, count_chat, count_comment, yen, title, release_date) = results[i]
str_sec_begin = str(int(sec_begin * 1000)).zfill(8)
str_sec_end = str(int(sec_end * 1000)).zfill(8)
path = getVideoPath(dir + date + "[" + id + "]_" + str_sec_begin + "-" + str_sec_end)
text = displayText(release_date, count_chat, count_comment, yen)
list_video.append(subClip(resolution, list_target_resolution[i], title, text, path, path_font))
duration_current += list_video[-1].duration
count_current += 1
if duration_current >= SEC_PER_PART or count_current >= COUNT_PER_PART or i == len_results - 1: # まとめて処理できる許容量を超えた場合、現在のリスト内の動画をすべて結合して出力
print(secondToTime(duration_sum) + " -> " + secondToTime(duration_sum + duration_current))
print("[", end="")
for video in list_video:
print(secondToTime(video.duration), end=", ")
print("]")
path_current = path_dst_video_pre + "_part" + str(part_current) + path_dst_video_extension
if part_current >= 0: # ToDo: 変更を含むpartファイルのみ作成し直す。現状は、すべてのpartファイルが作成し直される
concatenate_videoclips(list_video).write_videofile(path_current, codec="mpeg4", bitrate="1000000000")
list_video.clear()
part_current += 1
duration_sum += duration_current
duration_current = 0
count_current = 0
gc.collect() # 各動画出力ごとにメモリ解放を明示的に指定
if part_current == 1: # 許容量に到達しなかった場合は結合処理が不要
os.rename(path_dst_video_pre + "_part0" + path_dst_video_extension, path_dst_video)
else:
list_part = []
for i in range(part_current):
path_current = path_dst_video_pre + "_part" + str(i) + path_dst_video_extension
list_part.append(VideoFileClip(path_current))
concatenate_videoclips(list_part).write_videofile(path_dst_video, codec="mpeg4", bitrate="1000000000")
list_part.clear()
for i in range(part_current): # 一時ファイルをすべて削除
path_current = path_dst_video_pre + "_part" + str(i) + path_dst_video_extension
os.remove(path_current)
def execute(path_results, path_list_date_title, dir_video, path_dst_video, path_dst_timestamp, path_font=""):
dict_date_title = getDictDateTitle(path_list_date_title)
results = getResults(path_results, dict_date_title)
resolution, list_target_resolution, list_duration = getResolution(results, dir_video)
print("total: " + secondToTime(sum(list_duration)))
timestamp, timestamp1, timestamp_month, timestamp_year = generateTimestamp(results, list_duration)
with open(path_dst_timestamp, "w") as f:
f.write(timestamp)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "1" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp1)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "_month" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp_month)
with open(path_dst_timestamp[:path_dst_timestamp.rfind(".")] + "_year" + path_dst_timestamp[path_dst_timestamp.rfind("."):], "w") as f:
f.write(timestamp_year)
mergeClip(results, resolution, list_target_resolution, dir_video, path_dst_video, path_font)
def main():
execute("extract/results.txt", "extract/list_date_title.txt", "clip/", "dst/clip.mp4", "dst/timestamp.txt", os.path.dirname(__file__) + "/font/MPLUSRounded1c-Regular.ttf")
if __name__ == "__main__":
main()
すべての切り抜きで解像度を合わせる必要があるため、解像度の最頻値を最終的な切り抜きの解像度とします。
遊び心として、チャット数とスーパーチャット金額を動画内に表示する機能を付けています。
加えて、基本的な情報として、日付や動画タイトルも表示するようにしています。
また、見やすいように、結合部分にフェードを付けています。
長編の動画に対応するため、一定時間ごとに一度結合動画を作成し、最後にそれらをまとめて結合することで、処理中にメモリが不足しないようにしました。
(数時間くらいの動画なら作成できるはずです。より長い動画を考慮するなら、この階層数が変動するようにする必要がありますが、何度も結合処理をすると品質劣化につながるので、現状は2階層までの実装としています)
最後に、各切り抜きの長さからタイムスタンプを生成します。
YouTubeのチャプターに対応する形式で出力しています。
環境構築
環境構築についても簡単に触れておきます。
まずはツールを実行できる環境を整えます。
(yt-dlpとmoviepyを使えるようにします)
対応OSはLinux(私の場合はWindowsを使っているため、Ubuntu22.04LTS / WSL2)です。Windowsの場合は前述の通りWSL2を、Macの場合は少しシェルスクリプトを修正するかdockerコンテナ等でお使いください。
setup.sh
#!/bin/sh
# 作業用ルートディレクトリで実行 source setup.sh [arg1]
# 引数(任意1個): Pythonのバージョン(3.12等。デフォルト値は3)
python_version=${1:-3}
sudo apt update
sudo apt install -y "python${python_version}-venv" imagemagick ffmpeg
mkdir -p {src,data,venv}
rm -fr venv/yt-dlp_moviepy
"python${python_version}" -m venv venv/yt-dlp_moviepy
source venv/yt-dlp_moviepy/bin/activate
pip install -U pip
pip install -U yt-dlp
pip install -U moviepy
deactivate
source setup.sh
Pythonのバージョンを指定する場合 ↓
source setup.sh 3.12
pythonをインストールし、yt-dlpとmoviepyをpipでインストールしているだけです。仮想環境には標準機能のvenvを使っています。(imagemagickはmoviepyで必要です)
実行後は以下のようなディレクトリ構成になります。
.
├── data
├── setup.sh
├── src
└── venv
└── yt-dlp_moviepy
srcディレクトリはソースコード用、venvはPython仮想環境用、dataは本ツールで出力するデータ格納用です。
次に、ImageMagickのデフォルトのセキュリティポリシーではMoviepyのTextClipを実行できないため、以下の変更を加えます。
/etc/ImageMagick-6/policy.xml
の91行目
変更前: <policy domain="path" rights="none" pattern="@*"/>
変更後: <policy domain="path" rights="read|write" pattern="@*"/>
最後に、関連ファイル2点です。
1点目として、日本語を表示するため、日本語対応のフォントを配置しています。(日本語を表示しない場合は省略可能です)
配置場所は src/font/MPLUSRounded1c-Regular.ttf
です。
フォントファイルは、Google Fontsからダウンロードするか、上記GitHubリポジトリをクローンしてください。
もちろん、他の好きなフォントを使っても構いません。
2点目として、ログイン状態で処理できるよう、Cookieファイルを配置しています。
配置場所は src/auth/cookies.txt
です。
上記ファイルは空なので、ログインが必要な場合はブラウザ等から取得したCookieファイルで上書きします。ブラウザの拡張機能を使うと簡単に取得できます。
手順まとめ
- 環境構築
source setup.sh
- チャットの取得、抽出、クラスタリング
source src/chat.sh 作成するディレクトリのパス YouTubeのチャンネルID 検索文字列(厳しめ) 検索文字列(緩め) 取得開始日(YYYYMMDD) 取得終了日(YYYYMMDD)
- 実際にYouTubeで確認し、results.txtを編集
- 動画ダウンロード、動画編集
source src/video.sh 作成したディレクトリのパス オプション
最後に
YouTubeライブ配信アーカイブの切り抜き動画作成を自動化するツールを作成したので紹介しました。
データのダウンロードは時間の無駄なので、YouTube上のデータを直接扱えるようなサービスが今後登場すると嬉しいですね。
今回作成したコード量で切り抜き動画作成の大部分を自動化できてしまう上、LLM等機械学習系のAI分野の技術が成熟してきているので、人間が切り抜き動画を作る時代はすぐに終わりを迎えるでしょう。
質問や指摘、要望等があれば、下のコメント欄にお願いします。
それでは、また。
コメント一覧
自動化ツールの作成と公開ありがとうございます。
当方全くプログラミングがわからず、色々調べてやってはみたのですがうまくいきません・・・。
もし可能でしたらご教授いただけると嬉しいです。
全く分からずに調べてやっているので的はずれな認識等あるかと思います・・・。
とりあえずGithubに公開していただいているデータをダウンロードし
srcフォルダのchat.shのファイルの
$1に保存先のディレクトリ
$2にチャンネルID
こちらを入力しました。しかしプログラムの実行?ができません。
というかやり方がわかりません・・・。
いろいろ調べたところ、Jupyter Labを使えばいいのかな?(多分違いますよね)と思って導入してみたのですがダメでした。
pythonの挙動もおかしいような気がしています。
管理者権限で開かないと正常に動作しません。
clustering_chat等のpythonファイルをクリックすると一瞬pythonが起動して、動作しません。
予想になってしまうんですが、pythonが問題なく動作していれば普通にプログラムも実行されるのかな?とは思ったりしています。
ただ現状なにが正しくできていて、なにが間違っているのかもわからないため、どこに原因があるのかがわかりません・・・。
これだけの情報で伝わるのかもわかりませんが、どの辺りに問題がありそうなのかだけでも教えていただけたら嬉しいです・・・。
プログラムの公開と詳細があるにも関わらず、こんなにも分からないものかと自分でも驚きました・・・。
お手数ですがご確認お願いします。
返信が遅くなり申し訳ありません。
リポジトリは常に更新しているので、今の最新版で試していたでけないでしょうか。
もしかしたら、実行する位置が違う可能性があります。ダウンロードしたらそのディレクトリに入り、そこで src/chat.sh などのコードを実行してください。
また、pythonが管理者権限でないと実行できないのはおかしいです。本ツールはコマンドラインから実行することを想定していますが、何かのツールに依存した実行方法をとっていませんか。
あと、トラブルシューティングは詳細な状況が分からなければ解決が難しいため、エラーが発生したならその内容や、今まで実行した内容をそのままコピペしていただきたいです。