【切り抜き自動化/Python】YouTubeアーカイブのチャット/コメント取得、切り抜き箇所特定、動画編集(yt-dlp/moviepy)

ツール開発
スポンサーリンク

こんにちは。

現在、YouTubeでは長時間の配信を多く見かけるようになり、その結果切り抜き動画の需要が高まっているように思います。

今回、切り抜き動画作成をほとんど自動化するツールを作成したので紹介します。

主に以下の処理を自動化しました。

  • ライブ配信のアーカイブからチャットを取得する
  • 特定のキーワードに該当するチャットを抽出する
  • 該当の動画をダウンロードする
  • 動画編集(切り抜き、結合)する

また、本記事で紹介するコードはすべてGitHub上にも公開しています。

使用したパッケージ

チャットと動画のダウンロード

チャットと動画のダウンロードにはyt-dlpを使用しました。

このツールを使うと、YouTube(に限らず多くの動画配信プラットフォーム)の大体のデータを取得することができます。

aptやpipでインストールできるため扱いやすいです。もちろんGitから取得しても良いのですが。

例えば、YouTubeのあるチャンネル内の動画すべてのチャットとコメントを取得するには、以下のコマンドを一行実行するだけで実現できます。

yt-dlp --skip-download --write-subs --write-comments -o "%(upload_date)s[%(id)s]" チャンネルURL

--write-subsでライブチャットを取得、--write-commentsでコメントを取得、--skip-downloadで動画のダウンロードをスキップ、-oで出力名のパターンを指定しています。
動画をダウンロードしたい場合は--skip-downloadをつけなければ良いです。

このように、コマンド一行で非常に簡単にデータを取得できるようになっています。
オプションも多数用意されているため、カスタマイズ性が高い点も魅力です。

因みに、pipでインストールした場合はpythonコード内のメソッドからも実行可能です。

Git yt-dlp
PyPI yt-dlp

YouTube Data APIについて

もともとチャットの取得には、Googleが提供するYouTube Data APIを使う予定でした。
しかし、このAPIは現在配信中のライブのチャットを取得することはできますが、過去のアーカイブには非対応でした。
そういう訳で、この方法では動画のURLリストを取得するくらいしかできず、チャットの取得には別の手段をとる必要があったため断念しました。(クローラを用意して実際にページにアクセスするなど)

YouTube Data API

動画編集

動画編集にはmoviepyを使用しました。

こちらはよく使われている安定のパッケージだと思います。
動画編集に必要な機能をほとんど備えています。(バグは多い気がしますが)

例えば、今回使用している動画の切り抜きと結合は、以下のように非常に分かりやすい記述で実現できます。

切り抜き

video.subclip(sec_begin, sec_end)

結合

concatenate_videoclips(list_video)

PyPI moviepy

コード、使い方、実行例の紹介

ここからは、作成したツールのソースコードと、それを使用した実際の切り抜き動画の作成例を紹介します。

紹介するコードはGitHub上にも公開しています。
(本記事では、可読性と再現性の担保のため、極力ファイル分割やimportを使用しないようにしています。その結果、重複するコードが何度か出てきますがご了承ください。
通常版(最新版)は上記GitHubのmainブランチ、本記事とまったく同じコードはdiary-039ブランチにあります)

環境構築

まずはツールを実行できる環境を整えます。
(yt-dlpとmoviepyを使えるようにします)

対応OSはLinux(私の場合はWindowsを使っているため、Ubuntu22.04LTS / WSL2)です。Windowsの場合は前述の通りWSL2を、Macの場合は少しコードを修正してお使いください。

まず、作業用ディレクトリを作成し、そこに移動します。
作業用ディレクトリ内に以下のシェルスクリプトを配置し、実行します。

setup.sh

#!/bin/sh
# 作業用ルートディレクトリで実行 source setup.sh [arg1]
# 引数(任意1個): Pythonのバージョン(3.12等。デフォルト値は3)

python_version=${1:-3}

sudo apt update
sudo apt install "python${python_version}-venv"

mkdir -p {src,data,venv}
rm -fr venv/yt-dlp_moviepy

"python${python_version}" -m venv venv/yt-dlp_moviepy
source venv/yt-dlp_moviepy/bin/activate
pip install -U pip
pip install -U yt-dlp
pip install -U moviepy
deactivate
source setup.sh

Pythonのバージョンを指定する場合 ↓

source setup.sh 3.12

pythonをインストールし、yt-dlpとmoviepyをpipでインストールしているだけです。仮想環境には標準機能のvenvを使っています。

実行後は以下のようなディレクトリ構成になります。

.
├── data
├── setup.sh
├── src
└── venv
    └── yt-dlp_moviepy

srcディレクトリはソースコード用、venvはPython仮想環境用、dataは本ツールで出力するデータ格納用です。

チャットの取得、抽出、クラスタリング

ここからは、ソースコードや使い方、実際の実行例についてです。
具体的な使い方は実行例を参考にしてください。

チャットとコメントの取得はyt-dlpのコマンドで実行し、その後以下のpythonコードでチャットを1事象ごとにクラスタリングし、候補をリスト化して出力します。

以下のchat.shとclustering_chat.pyをsrcディレクトリ内に保存します。

chat.sh

#!/bin/sh
# 作業用ルートディレクトリで実行 source src/chat.sh arg1 arg2 [arg3, arg4]
# 引数(必須3個 + 任意2個): 作成するディレクトリ名, YouTubeのChannel ID, 検索文字列, 取得開始日(YYYYMMDD), 取得終了日(YYYYMMDD)

source venv/yt-dlp_moviepy/bin/activate
if [ ! -d data/"$1"/live_chat ]
then
  mkdir -p data/"$1"/live_chat
  cd data/"$1"/live_chat
  if [ $# -ge 5 ]
  then
    yt-dlp --skip-download --write-subs --write-comments --dateafter $4 --datebefore $5 -o "%(upload_date)s[%(id)s]" "https://www.youtube.com/channel/$2"
  elif [ $# == 4 ]
  then
    yt-dlp --skip-download --write-subs --write-comments --dateafter $4 -o "%(upload_date)s[%(id)s]" "https://www.youtube.com/channel/$2"
  else
    yt-dlp --skip-download --write-subs --write-comments -o "%(upload_date)s[%(id)s]" "https://www.youtube.com/channel/$2"
  fi
  cd ../../..
fi
if [ $# -ge 3 ]
then
  mkdir -p data/"$1"/extract
  cd data/"$1"/extract
  grep -E "$3" ../live_chat/* > search.txt
  python ../../../src/clustering_chat.py "$3"
  cd ../../..
fi
deactivate

yt-dlpコマンドで指定チャンネル内すべての動画のチャットとコメントを取得し、grepコマンドにより正規表現で文字列検索しています。

clustering_chat.py

import sys # argv
import re # search
import operator # itemgetter

def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
  begin = str.find(str_begin) + len(str_begin)
  if begin < len(str_begin):
    return ""
  end = str[begin:].find(str_end) + begin
  return str[begin:end]

def subStrEnd(str, str_begin, str_end): # 該当範囲の文字列を切り出し(終了文字列から検索)
  end = str.find(str_end)
  if end < 0:
    return ""
  begin = str[:end].rfind(str_begin) + len(str_begin)
  return str[begin:end]

def getId(line): # 生データからVideoIDを抽出
  return subStrBegin(line, "[", "]")

def getDate(line): # 生データから投稿日を抽出
  return subStrEnd(line, "/", "[")

def getSecond(line): # 生データから時刻を抽出
  str = subStrBegin(line, '"videoOffsetTimeMsec": "', '"')[:-3] # mili sec で記載されているため下3文字削る
  if str == "":
    return 0
  return int(str)

def getText(line): # 生データからチャットを抽出
  text = ""
  str_begin = '"text": "'
  str_end = '"'
  while True: # チャットがいくつかに分割される場合があるためすべて抽出して結合
    begin = line.find(str_begin) + len(str_begin)
    if begin < len(str_begin):
      break
    end = line[begin:].find(str_end) + begin
    text += line[begin:end]
    line = line[end + len(str_end):]
  return text

def isChat(line): # チャットならTrue、コメントならFalseを返す
  return subStrBegin(line, "].", ".") == "live_chat"

def isValidChat(line): # データが重複していないか確認
  if line.find("addChatItemAction") < 0:
    return False
  return True

def containStr(text, str): # 文字列が含まれるか判定
  return re.search(str, text) != None

def getCommentList(line, str): # 生データから該当コメントを抽出してリスト化
  list_comment = []
  str_begin = '"text": "'
  str_end = '"'
  while True: # 全コメントを抽出
    begin = line.find(str_begin) + len(str_begin)
    if begin < len(str_begin):
      break
    end = line[begin:].find(str_end) + begin
    comment = line[begin:end]
    if containStr(comment, str):
      list_comment.append(comment)
    line = line[end + len(str_end):]
  return list_comment

def secondToTime(second_src): # 秒数(int)から時間表示(str)に変換
  SECOND_PER_MINUTE = 60
  MINUTE_PER_HOUR = 60
  second = second_src % SECOND_PER_MINUTE
  second_src = int((second_src - second) / SECOND_PER_MINUTE)
  minute = second_src % MINUTE_PER_HOUR
  second_src = int((second_src - minute) / MINUTE_PER_HOUR)
  hour = second_src
  return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)

def clusteringChat(path, str=""): # チャットを1事象ごとにクラスタリング
  SEC_CLUSTERING = 60 # チャット間隔が60秒未満の場合、同じ事象に対するコメントだと判定
  MAX_EXAMPLE_CHAT = 12 # 参考例として出力する該当チャット数の最大値
  MAX_EXAMPLE_COMMENT = 12 # 参考例として出力する該当コメント数の最大値
  results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: チャット数, [n][5][m]: チャット例 (m <= 5)
  with open(path) as f:
  # 入力データは、1チャットにつき1行
    for line in f:
      video_id = getId(line)
      video_date = getDate(line)
      if not isChat(line): # コメントの場合、開始終了秒数を0として追加
        if str == "":
          list_comment = []
        else:
          list_comment = getCommentList(line, str)
          if len(list_comment) == 0: # コメントに検索文字列が含まれていない場合は除外
            continue
        results.append([video_id, video_date, 0, 0, len(list_comment), list_comment[0:MAX_EXAMPLE_COMMENT]])
        continue
      if not isValidChat(line): # チャットが既存データと重複する場合は除外
        continue
      chat_second = getSecond(line)
      chat_text = getText(line)
      if str != "" and not containStr(chat_text, str): # チャットに検索文字列が含まれていない場合は除外
        continue
      if len(results) > 0 and results[-1][0] == video_id and results[-1][3] > 0 and chat_second - results[-1][3] < SEC_CLUSTERING:
      # チャット間隔が SEC_CLUSTERING 秒未満の場合、同じ事象に対するコメントだと判定
        results[-1][3] = max(chat_second, results[-1][3])
        results[-1][4] += 1
        if results[-1][4] <= MAX_EXAMPLE_CHAT: # チャット例を MAX_EXAMPLE_CHAT 個まで保持
          results[-1][5].append(chat_text)
      else:
        results.append([video_id, video_date, chat_second, chat_second, 1, [chat_text]])
  return results

def writeResults(results, path): # 結果を出力: URL 暫定チャット数 開始秒数 終了秒数 投稿日 チャット例(最大5個)
  DELIMITER = " " # 出力時の区切り文字
  NEWLINE = "\n" # 改行文字
  SEC_BUFFER = 30 # チャット1個目の何秒前から動画を確認するか
  with open(path, "w") as f:
    for (id, date, sec_begin, sec_end, count, list_text) in results:
      second = max(sec_begin - SEC_BUFFER, 0)
      f.write("https://www.youtube.com/watch?v=" + id + "&t=" + str(second) + "s" + DELIMITER)
      f.write(str(count) + DELIMITER)
      f.write(secondToTime(sec_begin) + DELIMITER)
      f.write(secondToTime(sec_end) + DELIMITER)
      f.write(date + DELIMITER)
      for text in list_text:
        f.write(text + DELIMITER)
      f.write(NEWLINE)

def execute(dir, file_src, file_dst, str=""):
  results = clusteringChat(dir + file_src, str)
  results_sorted = sorted(results, key=operator.itemgetter(1, 0)) # 日付順に並び替え 日付が同じ場合はID順
  writeResults(results_sorted, dir + file_dst)

def main():
  str = ""
  if len(sys.argv) > 1:
    str = sys.argv[1]
  execute("./", "search.txt", "results.txt", str)

if __name__ == "__main__":
  main()

該当するチャットから60秒間該当チャットがなかった場合、事象が終了したと判断しています。

以上を実行するためには、コメントに記載の通り以下のコマンドを実行します。

source src/chat.sh 作成するディレクトリ名 YouTubeのチャンネルID 検索文字列 取得開始日(YYYYMMDD) 取得終了日(YYYYMMDD)

開始日と終了日は任意です。検索文字列は正規表現で指定可能です。

実行後、data/作成したディレクトリ名/extract/results.txt に候補が出力されるため、候補を実際にYouTubeの動画で確認します。

実際に切り抜きに使う動画は、開始秒数と終了秒数を編集したうえで、判別用に行頭に何か一文字追記します。

この後の切り抜き動画作成時に、results.txtの上から順に結合されるため、切り抜き順序を入れ替えたい場合はここで行を入れ替えておきます。デフォルトは投稿日順です。

余談ですが、テキストエディタとしてVSCodeを利用すると、Ctrlを押しながらリンクをクリックすればブラウザで該当URLのページが開かれるので便利です。

ここまでで自動化の第一段階は終了なので、一旦実行例を見てみます。

さて、実行例としては、VTuberのくしゃみ切り抜き動画の生成過程を紹介します。

幸いなことに(?)、VTuber界隈には「くしゃみ助かる」文化があるためチャットから判断しやすく、かつ偶発性や発生頻度がテスト用に適していると判断したためです。

テスト用に、チャットが活発であるくらいに人気があるうえ、ライブ配信数が少なめの配信者を対象としたかったため、にじさんじの「剣持刀也」さんを選びました。イェア!

実行例は以下の通りです。

source src/chat.sh kenmochi UCv1fFr156jc65EMiLbaLImw "くしゃみ|クシャミ"

検索文字列はひらがなとカタカナのorとしました。ここの条件を緩くし過ぎると候補の数が膨大になってしまうので、ある程度条件を厳しめにするのが良いでしょう。

日付は指定していないので全期間の動画が対象です。
ダウンロードには半日程度かかったと思います。(ここが最も時間のかかる処理だと思われます)

実行後は以下のようなディレクトリ構成になります。

.
├── data
│   └── kenmochi
│       ├── extract
│       │   ├── results.txt
│       │   └── search.txt
│       └── live_chat
│           ├── 20180323[NjHYJqnKoOs].info.json
│           ├── 20180323[NjHYJqnKoOs].live_chat.json
│           ├── ...
├── setup.sh
├── src
│   ├── chat.sh
│   ├── clustering_chat.py
│   ├── download_clip.py
│   ├── edit_video.py
│   ├── font
│   │   └── MPLUSRounded1c-Regular.ttf
│   ├── sum_superchat.py
│   └── video.sh
└── venv
    └── yt-dlp_moviepy

data/kenmochi/extract/results.txt に候補のリストが保存されています。これは、grepコマンドにより出力されたsearch.txtを事象ごとにクラスタリングした結果です。

実行例では、search.txtの1231行から、候補が189行にクラスタリングされました。

この結果をもとに実際にYouTubeで確認し、results.txtを編集します。

results.txt 編集前 (先頭4行)

https://www.youtube.com/watch?v=RmCyzVIoa4g&t=1309s 2 0:22:19 0:22:20 20180417 くしゃみ民もおるぞ くしゃみ民もいるぞー 
https://www.youtube.com/watch?v=87XLyhuR1_E&t=0s 6 0:00:00 0:00:00 20180423 40:21 くしゃみ くしゃみ 40:22\n\n1:06:52 「使用…利用目的とかわかんないですけどね」\n\n1:17:01 同人誌の話\n\n1:21:19『寝れる人すごいですよねだって先生頑張ってるのに…』 40:22 〜40:28 くしゃみ 顔認識が少し甘いのかなと思うところがこれまでにちょくちょくあったのに、クシャミの表情はやたらとしっかりしてて笑った 40:22 あっくしゃみ出そう\n40:28 クシュン ちゃんとくしゃみが出そうな顔が再現されてるのが凄い 
https://www.youtube.com/watch?v=87XLyhuR1_E&t=2402s 83 0:40:32 0:41:42 20180423 くしゃみ民歓喜 くしゃみ民 くしゃみ民歓喜 くしゃみ民歓喜 くしゃみ民大歓喜 かわいいくしゃみ くしゃみ民 くしゃみかわいい くしゃみ可愛いんだけど くしゃみ民歓喜 可愛いくしゃみするな くしゃみ民救済 
https://www.youtube.com/watch?v=87XLyhuR1_E&t=3971s 3 1:06:41 1:07:13 20180423 くしゃみ入れて 「くしゃみ」ボイス くしゃみ民に媚びろ 

results.txt 編集後 (先頭4行)

https://www.youtube.com/watch?v=RmCyzVIoa4g&t=1309s 2 0:22:19 0:22:20 20180417 くしゃみ民もおるぞ くしゃみ民もいるぞー 
https://www.youtube.com/watch?v=87XLyhuR1_E&t=0s 6 0:00:00 0:00:00 20180423 40:21 くしゃみ くしゃみ 40:22\n\n1:06:52 「使用…利用目的とかわかんないですけどね」\n\n1:17:01 同人誌の話\n\n1:21:19『寝れる人すごいですよねだって先生頑張ってるのに…』 40:22 〜40:28 くしゃみ 顔認識が少し甘いのかなと思うところがこれまでにちょくちょくあったのに、クシャミの表情はやたらとしっかりしてて笑った 40:22 あっくしゃみ出そう\n40:28 クシュン ちゃんとくしゃみが出そうな顔が再現されてるのが凄い 
-https://www.youtube.com/watch?v=87XLyhuR1_E&t=2402s 83 0:40:20 0:40:58 20180423 くしゃみ民歓喜 くしゃみ民 くしゃみ民歓喜 くしゃみ民歓喜 くしゃみ民大歓喜 かわいいくしゃみ くしゃみ民 くしゃみかわいい くしゃみ可愛いんだけど くしゃみ民歓喜 可愛いくしゃみするな くしゃみ民救済 
https://www.youtube.com/watch?v=87XLyhuR1_E&t=3971s 3 1:06:41 1:07:13 20180423 くしゃみ入れて 「くしゃみ」ボイス くしゃみ民に媚びろ 

切り抜き対象とした候補は開始時刻と終了時刻を編集し、行頭に「-」を追記しました。

手動が必要なのは、実際にYouTubeで確認するこの部分のみです。

動画ダウンロード、動画編集 (+ スパチャ額集計)

ここからは、先程編集した結果をもとに切り抜き動画を生成します。

これはただの遊び心なのですが、動画をダウンロードする前に、スーパーチャットの金額を動画内に表示する機能を付けたいと思います。

sum_superchat.py

import sys # argv
import os # path.isfile
import re # search
import csv # reader

def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
  begin = str.find(str_begin) + len(str_begin)
  if begin < len(str_begin):
    return ""
  end = str[begin:].find(str_end) + begin
  return str[begin:end]

def getSecond(line): # 生データから時刻を抽出
  str = subStrBegin(line, '"videoOffsetTimeMsec": "', '"')[:-3] # mili sec で記載されているため下3文字削る
  if str == "":
    return 0
  return int(str)

def getText(line): # 生データからチャットを抽出
  text = ""
  str_begin = '"text": "'
  str_end = '"'
  while True: # チャットがいくつかに分割される場合があるためすべて抽出して結合
    begin = line.find(str_begin) + len(str_begin)
    if begin < len(str_begin):
      break
    end = line[begin:].find(str_end) + begin
    text += line[begin:end]
    line = line[end + len(str_end):]
  return text

def getYen(line): # 生データからスーパーチャット金額を抽出
  YEN_PER = {"¥": 1, "$": 150, "€": 160} # 円, ドル, ユーロ に対応 (為替レートは2024/02/13時点)
  str = subStrBegin(line, '"purchaseAmountText": {"simpleText": "', '"')
  if str == "":
    return 0
  if str[0] not in YEN_PER:
    return 0
  return float(str[1:].replace(",", "")) * YEN_PER[str[0]]

def isValidChat(line): # データが重複していないか確認
  if line.find("addChatItemAction") < 0:
    return False
  return True

def containStr(text, str): # 文字列が含まれるか判定
  return re.search(str, text) != None

def getCommentList(line, str): # 生データから該当コメントを抽出してリスト化
  list_comment = []
  str_begin = '"text": "'
  str_end = '"'
  while True: # 全コメントを抽出
    begin = line.find(str_begin) + len(str_begin)
    if begin < len(str_begin):
      break
    end = line[begin:].find(str_end) + begin
    comment = line[begin:end]
    if containStr(comment, str):
      list_comment.append(comment)
    line = line[end + len(str_end):]
  return list_comment

def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
  SECOND_PER_MINUTE = 60
  MINUTE_PER_HOUR = 60
  DELIMITER = ":" # 時間表示の区切り文字
  index_delimiter = str.find(DELIMITER)
  hour = int(str[:index_delimiter])
  str = str[index_delimiter + len(DELIMITER):]
  index_delimiter = str.find(DELIMITER)
  minute = int(str[:index_delimiter])
  str = str[index_delimiter + len(DELIMITER):]
  second = float(str)
  return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second

def getResults(path): # ファイルから結果を取得
  DELIMITER = " " # 区切り文字
  results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数
  with open(path) as f:
    reader = csv.reader(f, delimiter=DELIMITER)
    for row in reader:
      if row[0][:4] != "http": # 行頭に変更がなければ除外
        results.append((subStrBegin(row[0], "=", "&"), row[4], timeToSecond(row[2]), timeToSecond(row[3])))
  return results

def sumSuperchat(sec_begin, sec_end, str, path_chat, path_comment): # スーパーチャットの合計金額、チャット数を取得
  SEC_CLUSTERING = 90 # スーパーチャットかチャットの間隔が60秒未満の場合、同じ事象に対するコメントだと判定
  sum_yen = 0
  count_chat = 0
  if os.path.isfile(path_chat):
    with open(path_chat) as f:
      for line in f:
        if not isValidChat(line):
          continue
        second = getSecond(line)
        text = getText(line)
        yen = getYen(line)
        contain_str = str != "" and containStr(text, str)
        if second >= sec_begin:
          if second < sec_end + SEC_CLUSTERING:
            if yen > 0 or contain_str:
              sum_yen += yen
              count_chat += 1
              if second > sec_end:
                sec_end = second
          else:
            break
  count_comment = 0
  if os.path.isfile(path_comment):
    with open(path_comment) as f:
      for line in f:
        if str != "":
          count_comment += len(getCommentList(line, str))
  return int(sum_yen), count_chat, count_comment

def sumSuperchatList(results, str, dir): # 全候補に対するスーパーチャットの合計金額、チャット数を返す
  list_sum = []
  for (id, date, sec_begin, sec_end) in results:
    path_chat = dir + date + "[" + id + "].live_chat.json"
    path_comment = dir + date + "[" + id + "].info.json"
    list_sum.append(sumSuperchat(sec_begin, sec_end, str, path_chat, path_comment))
  return list_sum

def writeSumSuperchat(list_sum, path): # 結果を出力
  DELIMITER = " " # 出力時の区切り文字
  NEWLINE = "\n" # 改行文字
  with open(path, "w") as f:
    for sum_superchat in list_sum:
      f.write(str(sum_superchat[0]) + DELIMITER + str(sum_superchat[1]) + DELIMITER + str(sum_superchat[2]) + NEWLINE)

def execute(str_search, path_src, dir, path_dst):
  results = getResults(path_src)
  list_sum = sumSuperchatList(results, str_search, dir)
  writeSumSuperchat(list_sum, path_dst)

def main():
  str = ""
  if len(sys.argv) > 1:
    str = sys.argv[1]
  execute(str, "results.txt", "../live_chat/", "superchat.txt")

if __name__ == "__main__":
  main()

切り抜き付近のスーパーチャットの合計金額を取得しています。
また、より正確なチャット数も取得しています。

次に、動画をダウンロードします。

download_clip.py

import sys # argv
import os # remove, path.isfile
import csv # reader
import yt_dlp # YoutubeDL
from moviepy.editor import *

def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
  begin = str.find(str_begin) + len(str_begin)
  if begin < len(str_begin):
    return ""
  end = str[begin:].find(str_end) + begin
  return str[begin:end]

def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
  SECOND_PER_MINUTE = 60
  MINUTE_PER_HOUR = 60
  DELIMITER = ":" # 時間表示の区切り文字
  index_delimiter = str.find(DELIMITER)
  hour = int(str[:index_delimiter])
  str = str[index_delimiter + len(DELIMITER):]
  index_delimiter = str.find(DELIMITER)
  minute = int(str[:index_delimiter])
  str = str[index_delimiter + len(DELIMITER):]
  second = float(str)
  return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second

def getResults(path): # ファイルから結果を取得
  DELIMITER = " " # 区切り文字
  results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: URL
  with open(path) as f:
    reader = csv.reader(f, delimiter=DELIMITER)
    for row in reader:
      if row[0][:4] != "http": # 行頭に変更がなければ除外
        results.append((subStrBegin(row[0], "=", "&"), row[4], timeToSecond(row[2]), timeToSecond(row[3]), row[0][row[0].find("http"):row[0].find("&")]))
  return results

def downloadClip(results, remove_original): # 各動画のダウンロードと切り抜き
  OPTION = {
    "outtmpl": "%(upload_date)s[%(id)s].%(ext)s", # 出力形式 投稿日[動画ID].mp4
    "ignoreerrors": True, # エラーを無視して続行
  }
  MAX_RETRY_DOWNLOAD = 6 # ダウンロードに失敗した際の最大再試行回数
  len_results = len(results)
  with yt_dlp.YoutubeDL(OPTION) as ydl:
    count_same_id = 0
    for i in range(len_results):
      (id, date, sec_begin, sec_end, url) = results[i]
      filename = date + "[" + id + "]"
      path_download = filename + ".mp4"
      path_clip = filename + "_clip" + str(count_same_id) + ".mp4"
      if not os.path.isfile(path_clip): # 出力先ファイルが既に存在する場合は動画生成しない
        retry_download = MAX_RETRY_DOWNLOAD
        while not os.path.isfile(path_download) and retry_download > 0: # 存在しない場合のみダウンロード
          retry_download -= 1
          ydl.download([url])
        if os.path.isfile(path_download): # ダウンロードに失敗した場合はスキップ
          if sec_begin < sec_end: # 時刻指定に誤りがある場合はスキップ
            videoclip = VideoFileClip(path_download).subclip(sec_begin, sec_end)
            videoclip.write_videofile(path_clip, codec="mpeg4", bitrate="1000000000")
      count_same_id += 1
      id_next = ""
      if i < len_results - 1:
        id_next = results[i + 1][0]
      if (id != id_next):
        count_same_id = 0
        if remove_original: # 用済みになった動画は、オプションがTrueなら削除
          if os.path.isfile(path_download):
            os.remove(path_download)

def execute(path, remove_original):
  results = getResults(path)
  downloadClip(results, remove_original)

def main():
  remove_original = len(sys.argv) > 1 and (sys.argv[1] == "-d" or sys.argv[1] == "-D") # ダウンロードした元動画を削除するか
  execute("../extract/results.txt", remove_original)

if __name__ == "__main__":
  main()

コマンドからでもダウンロードできるのですが、1動画ごとにダウンロード、切り抜き作成、元動画削除(オプション)をまとめて実行したかったため、pythonコード内でダウンロードすることにしました。

元動画削除機能をつけることで、大量の切り抜き箇所があったとしてもストレージを圧迫しないようにしています。

動画完成後に切り抜き時刻を変更した場合でも、削除された動画のみもう一度ダウンロードや切り抜きするようにしました。

最後に、上記で生成された各切り抜きを結合します。

edit_video.py

import os # path.isfile
import csv # reader
import numpy # array, argmin, abs
from moviepy.editor import *
from moviepy.video.fx.resize import resize

def subStrBegin(str, str_begin, str_end): # 該当範囲の文字列を切り出し(開始文字列から検索)
  begin = str.find(str_begin) + len(str_begin)
  if begin < len(str_begin):
    return ""
  end = str[begin:].find(str_end) + begin
  return str[begin:end]

def timeToSecond(str): # 時間表示(str)から秒数(float)に変換
  SECOND_PER_MINUTE = 60
  MINUTE_PER_HOUR = 60
  DELIMITER = ":" # 時間表示の区切り文字
  index_delimiter = str.find(DELIMITER)
  hour = int(str[:index_delimiter])
  str = str[index_delimiter + len(DELIMITER):]
  index_delimiter = str.find(DELIMITER)
  minute = int(str[:index_delimiter])
  str = str[index_delimiter + len(DELIMITER):]
  second = float(str)
  return (hour * MINUTE_PER_HOUR + minute) * SECOND_PER_MINUTE + second

def secondToTime(second_src): # 秒数(float)から時間表示(str)に変換
  SECOND_PER_MINUTE = 60
  MINUTE_PER_HOUR = 60
  second_src = int(second_src)
  second = second_src % SECOND_PER_MINUTE
  second_src = int((second_src - second) / SECOND_PER_MINUTE)
  minute = second_src % MINUTE_PER_HOUR
  second_src = int((second_src - minute) / MINUTE_PER_HOUR)
  hour = second_src
  return str(hour) + ":" + str(minute).zfill(2) + ":" + str(second).zfill(2)

def getResults(path): # ファイルから結果を取得
  DELIMITER = " " # 区切り文字
  results = [] # [n][0]: VideoID, [n][1]: 投稿日, [n][2]: 開始秒数, [n][3]: 終了秒数, [n][4]: チャット数, [n][5]: コメント数, [n][6]: 金額, [n][7]: URL
  with open(path) as f:
    reader = csv.reader(f, delimiter=DELIMITER)
    for row in reader:
      if row[0][:4] != "http": # 行頭に変更がなければ除外
        results.append([subStrBegin(row[0], "=", "&"), row[4], timeToSecond(row[2]), timeToSecond(row[3]), row[1], 0, 0, row[0][row[0].find("http"):row[0].find("&")]])
  return results

def getYenChatList(path): # ファイルからスーパーチャットの合計金額、チャット数を取得
  DELIMITER = " " # 区切り文字
  if not os.path.isfile(path):
    return []
  list_yen_chat = []
  with open(path) as f:
    reader = csv.reader(f, delimiter=DELIMITER)
    for row in reader:
      list_yen_chat.append((int(row[0]), int(row[1]), int(row[2])))
  return list_yen_chat

def updateYenChat(results, path): # 結果のうち、スーパーチャットの合計金額、チャット数を正確な値に更新
  list_yen_chat = getYenChatList(path)
  len_list = len(list_yen_chat)
  for i in range(len_list):
    results[i][4] = list_yen_chat[i][1]
    results[i][5] = list_yen_chat[i][2]
    results[i][6] = list_yen_chat[i][0]

def displayText(date, count_chat, count_comment, yen): # 切り抜き動画中に表示する文字
  DISPLAY_DATE = True # 投稿日の表示オプション
  DISPLAY_COUNT = True # 該当チャット数の表示オプション
  DISPLAY_YEN = True # スーパーチャット金額(円)の表示オプション
  NEWLINE = "\n" # 改行文字
  display_text = ""
  if DISPLAY_DATE:
    display_text += date[:4] + "/" + date[4:6] + "/" + date[6:] + NEWLINE
  if DISPLAY_COUNT and count_chat > 0:
    display_text += "関連チャット数: " + str(count_chat) + NEWLINE
  if DISPLAY_COUNT and count_comment > 0:
    display_text += "関連コメント数: " + str(count_comment) + NEWLINE
  if DISPLAY_YEN and yen > 0:
    display_text += "スパチャ総額: ¥" + str(yen) + NEWLINE
  if len(display_text) > 0:
    display_text = display_text[:- len(NEWLINE)]
  return display_text

def getResolution(results, dir): # 全切り抜きのうち、該当数が最も多い解像度を取得
  LIST_HEIGHT_16_9 = [144, 360, 480, 720, 1080, 1440, 2160, 4320] # 一般的な16:9の解像度(の高さ)一覧
  list_count = [0] * len(LIST_HEIGHT_16_9)
  for (id, date, _, _, _, _, _, _) in results:
    path = dir + date + "[" + id + "]_clip0.mp4"
    video = VideoFileClip(path)
    index_nearest = numpy.argmin(numpy.abs(numpy.array(LIST_HEIGHT_16_9) - video.h))
    list_count[index_nearest] += 1
  height = LIST_HEIGHT_16_9[list_count.index(max(list_count))]
  width = int(height * 16 / 9)
  return (height, width)

def subClip(resolution, text, path, path_font=""): # 各切り抜きのサイズを合わせ、文字とフェード効果を付与
  SEC_FADEIN = 1 # フェードイン秒数
  SEC_FADEOUT = 1 # フェードアウト秒数
  (height, width) = resolution
  videoclip = VideoFileClip(path, target_resolution=resolution) # 指定の解像度で動画読み込み。改良の余地はあるが、現状moviepyのバグで対応不可
  fontsize = int(height / 20)
  if path_font == "":
    textclip = TextClip(txt=text, fontsize=fontsize, color="#ffffff", bg_color="#000000c0").set_position((16, 16)).set_duration(videoclip.duration)
  else:
    textclip = TextClip(txt=text, font=path_font, fontsize=fontsize, color="#ffffff", bg_color="#000000c0").set_position((16, 16)).set_duration(videoclip.duration)
  return CompositeVideoClip(clips=[videoclip, textclip]).fadein(SEC_FADEIN).fadeout(SEC_FADEOUT)

def mergeClip(results, dir, path_font): # 全切り抜きを結合
  list_video = []
  resolution = getResolution(results, dir)
  len_results = len(results)
  count_same_id = 0
  for i in range(len_results):
    (id, date, sec_begin, sec_end, count_chat, count_comment, yen, url) = results[i]
    path = dir + date + "[" + id + "]_clip" + str(count_same_id) + ".mp4"
    text = displayText(date, count_chat, count_comment, yen)
    list_video.append(subClip(resolution, text, path, path_font))
    count_same_id += 1
    id_next = ""
    if i < len_results - 1:
      id_next = results[i + 1][0]
    if id != id_next:
      count_same_id = 0
  return concatenate_videoclips(list_video)

def generateTimestamp(results): # タイムスタンプ用の文字列を生成 各切り抜きの開始時刻、投稿日、URL
  NEWLINE = "\n" # 改行文字
  timestamp = ""
  len_results = len(results)
  sec_sum = 0
  for i in range(len_results):
    (_, date, sec_begin, sec_end, _, _, _, url) = results[i]
    timestamp += secondToTime(sec_sum) + " " + str(i + 1) + ". " + date[:4] + "/" + date[4:6] + "/" + date[6:] + " "
    timestamp += url + "&t=" + str(int(sec_begin)) + "s" + NEWLINE
    sec_sum += sec_end - sec_begin
  return timestamp

def execute(path_results, dir_video, path_dst_video, path_dst_timestamp, path_superchat="", path_font=""):
  results = getResults(path_results)
  updateYenChat(results, path_superchat)
  video = mergeClip(results, dir_video, path_font)
  video.write_videofile(path_dst_video, codec="mpeg4", bitrate="1000000000")
  timestamp = generateTimestamp(results)
  with open(path_dst_timestamp, "w") as f:
    f.write(timestamp)

def main():
  execute("../extract/results.txt", "../clip/", "clip.mp4", "timestamp.txt", "../extract/superchat.txt", "../../../src/font/MPLUSRounded1c-Regular.ttf")

if __name__ == "__main__":
  main()

結合や文字表示をしています。
(異なるサイズの動画を結合する際の挙動が厄介でした)

これらをまとめて実行できるように以下のシェルスクリプトを使います。

video.sh

#!/bin/sh
# 作業用ルートディレクトリで実行 source src/video.sh arg1 [arg2, arg3]
# 引数(必須1個 + 任意2個): 作成したディレクトリ名, 検索文字列, -d (ダウンロードした元動画を削除する場合)

source venv/yt-dlp_moviepy/bin/activate
cd data/"$1"/extract
if [ $# -ge 2 ]
then
  python ../../../src/sum_superchat.py "$2"
else
  python ../../../src/sum_superchat.py
fi
cd ..
mkdir clip
cd clip
if [ $# -ge 3 ]
then
  python ../../../src/download_clip.py "$3"
else
  python ../../../src/download_clip.py
fi
cd ..
mkdir dst
cd dst
python ../../../src/edit_video.py
cd ../../..
deactivate

以上のファイルをsrcディレクトリ内に保存し、以下のコマンドを実行します。

source src/video.sh 作成したディレクトリ名 検索文字列 -d

検索文字列は任意です。この文字列は、スーパーチャットやチャットの集計に使われます。
ダウンロードした元動画を削除する場合、 -d を指定します。

完成動画は data/作成したディレクトリ名/dst/clip.mp4 に出力されます。
また、タイムスタンプは data/作成したディレクトリ名/dst/timestamp.txt に出力されます。

もし切り抜きの開始終了時刻を変更したい場合、results.txtを編集し直した後、data/作成したディレクトリ名/clip/~~_clip~.mp4 (該当する切り抜き)を削除して、もう一度上記コマンドを実行します。すると、削除された動画のみ再度生成されます。

ここまでで自動化の第二段階は終了なので、実行例を見てみます。

実行例は以下の通りです。

source src/video.sh kenmochi "くしゃみ|クシャミ|嚏|嚔|助か|たすか|くしゃたす|くしゃ民|浴びた|あびた|tskr|TSKR|Bless you|bless you|ぶれすゆ|ブレスユ"

ここでの検索文字列は緩くて良いため、集計に含めるパターンをある程度網羅的に含めました。

生成された動画は以下の通りです。

タイムスタンプは以下のように出力されます。

timestamp.txt (先頭4行)

0:00:00 1. 2018/04/23 https://www.youtube.com/watch?v=87XLyhuR1_E&t=2420s
0:00:38 2. 2018/09/21 https://www.youtube.com/watch?v=5OYX_GTZ1gA&t=2841s
0:00:47 3. 2018/12/18 https://www.youtube.com/watch?v=glEJjsHJt_4&t=1959s
0:00:59 4. 2018/12/18 https://www.youtube.com/watch?v=glEJjsHJt_4&t=3198s

実行後のディレクトリ構成は以下の通りです。

.
├── data
│   └── kenmochi
│       ├── clip
│       │   ├── 20180423[87XLyhuR1_E]_clip0.mp4
│       │   ├── 20180921[5OYX_GTZ1gA]_clip0.mp4
│       │   ├── 20181218[glEJjsHJt_4]_clip0.mp4
│       │   ├── 20181218[glEJjsHJt_4]_clip1.mp4
│       │   ├── 20190518[9ceIieJHgOU]_clip0.mp4
│       │   ├── 20191117[hCTujPem4Nk]_clip0.mp4
│       │   ├── 20200418[gl3rDQgeRWI]_clip0.mp4
│       │   ├── 20200804[l9u29pbHXXk]_clip0.mp4
│       │   ├── 20200804[l9u29pbHXXk]_clip1.mp4
│       │   ├── 20201119[9BvYx6nWa8Q]_clip0.mp4
│       │   ├── 20210114[34Y9T33ErDo]_clip0.mp4
│       │   └── 20231022[rk6YQBxmMY8]_clip0.mp4
│       ├── dst
│       │   ├── clip.mp4
│       │   └── timestamp.txt
│       ├── extract
│       │   ├── results.txt
│       │   ├── search.txt
│       │   └── superchat.txt
│       └── live_chat
│           ├── 20180314[ilaMUJk831s].info.json
│           ├── ...
├── setup.sh
├── src
│   ├── chat.sh
│   ├── clustering_chat.py
│   ├── download_clip.py
│   ├── edit_video.py
│   ├── font
│   │   └── MPLUSRounded1c-Regular.ttf
│   ├── sum_superchat.py
│   └── video.sh
└── venv
    └── yt-dlp_moviepy

以上で切り抜き自動化処理は完了です。

手順まとめ

  1. 環境構築
    source setup.sh
  2. チャットの取得、抽出、クラスタリング
    source src/chat.sh 作成するディレクトリ名 YouTubeのチャンネルID 検索文字列 取得開始日(YYYYMMDD) 取得終了日(YYYYMMDD)
  3. 実際にYouTubeで確認し、results.txtを編集
  4. 動画ダウンロード、動画編集 (+ スパチャ額集計)
    source src/video.sh 作成したディレクトリ名 検索文字列 -d

最後に

YouTubeライブ配信アーカイブの切り抜き動画作成を自動化するツールを作成したので紹介しました。

データのダウンロードは時間の無駄なので、YouTube上のデータを直接扱えるようなサービスが今後登場すると嬉しいですね。

今回作成したコード量で切り抜き動画作成の大部分を自動化できてしまう上、LLM等機械学習系のAI分野の技術が成熟してきているので、人間が切り抜き動画を作る時代はすぐに終わりを迎えるでしょう。

質問や指摘、要望等があれば、下のコメント欄にお願いします。

それでは、また。

ツール開発技術系

コメント

タイトルとURLをコピーしました