From 5e73eee3e1f6a05d82eeefad4527053fb403315e Mon Sep 17 00:00:00 2001 From: Gabriel Piriz Date: Sun, 15 Sep 2024 07:43:35 -0300 Subject: [PATCH] add Admin.merge_streams function (#76) --- lib/algora/admin.ex | 114 ++++++++++++++++++ lib/algora/admin/generate_blurb_thumbnails.ex | 14 +-- lib/algora/clipper.ex | 20 +-- lib/algora/storage.ex | 16 +++ 4 files changed, 139 insertions(+), 25 deletions(-) diff --git a/lib/algora/admin.ex b/lib/algora/admin.ex index bbe9ba7..8c6bb74 100644 --- a/lib/algora/admin.ex +++ b/lib/algora/admin.ex @@ -16,6 +16,120 @@ defmodule Algora.Admin do Finch.build(:get, url) |> Finch.request(Algora.Finch) end + defp get_absolute_media_playlist(video) do + %ExM3U8.MediaPlaylist{timeline: timeline, info: info} = get_media_playlist(video, @tracks.video) + + timeline = timeline + |> Enum.reduce([], fn x, acc -> + case x do + %ExM3U8.Tags.MediaInit{uri: uri} -> [ + %ExM3U8.Tags.MediaInit{uri: Storage.to_absolute(:video, video.uuid, uri)} + | acc + ] + %ExM3U8.Tags.Segment{uri: uri, duration: duration} -> [ + %ExM3U8.Tags.Segment{uri: Storage.to_absolute(:video, video.uuid, uri), duration: duration} + | acc + ] + others -> [others | acc] + end + end) + |> Enum.reverse() + + %ExM3U8.MediaPlaylist{timeline: timeline, info: info} + end + + defp merge_media_playlists(video, nil), do: get_absolute_media_playlist(video) + + defp merge_media_playlists(video, playlist) do + new_playlist = video |> get_absolute_media_playlist + + %ExM3U8.MediaPlaylist{ + playlist + | timeline: playlist.timeline ++ [%ExM3U8.Tags.Discontinuity{} | new_playlist.timeline], + info: %ExM3U8.MediaPlaylist.Info{ + playlist.info + | target_duration: max(playlist.info.target_duration, new_playlist.info.target_duration) + } + } + end + + defp merge_playlists(videos) do + example_playlist = videos |> Enum.at(0) |> get_playlist() + + streams = Enum.map(videos, fn v -> + [item | _] = get_playlist(v) |> then(&Map.get(&1, :items)) + count = get_media_playlist(v, @tracks.video) + |> then(&Enum.count(&1.timeline, fn + %ExM3U8.Tags.Segment{} -> true + _ -> false + end)) + {item, count} + end) + + {example_stream, _} = streams |> Enum.at(0) + + if Enum.all?(streams, fn {x, _} -> example_stream.resolution == x.resolution && example_stream.codecs == x.codecs end) do + max_bandwidth = Enum.map(streams, fn {stream, _} -> Map.get(stream, :bandwidth) end) |> Enum.max(&Ratio.gte?/2) + avg_bandwidth = streams + |> Enum.reduce({0, 0}, fn {s, count}, {avg_sum, count_sum} -> {avg_sum + s.average_bandwidth * count, count_sum + count} end) + |> then(fn {avg, count} -> avg / count end ) + |> Ratio.trunc() + + {:ok, %{ example_playlist + | items: [ + %{ example_stream + | average_bandwidth: avg_bandwidth, + bandwidth: max_bandwidth + } + ] + } + } + else + {:error, "Codecs or resolutions don't match"} + end + end + + defp insert_merged_video(videos, playlist, media_playlist) do + [video | _] = videos + + duration = videos |> Enum.reduce(0, fn v, d -> d + v.duration end) + + result = %{video | duration: duration, id: nil, filename: nil} + |> change() + |> Video.put_video_url(video.format) + |> Repo.insert() + + upload_to = fn uuid, track_atom, content -> Storage.upload( + content, + "#{uuid}/#{@tracks[track_atom]}", + content_type: "application/x-mpegURL" + ) end + + with {:ok, new_video} <- result, + {:ok, _} <- upload_to.(new_video.uuid, :manifest, ExM3U8.serialize(playlist)), + {:ok, _} <- upload_to.(new_video.uuid, :video, "#{ExM3U8.serialize(media_playlist)}#EXT-X-ENDLIST\n") do + result + end + end + + def merge_streams(videos) do + with {:ok, playlist} <- merge_playlists(videos), + media_playlist <- Enum.reduce(videos, nil, &merge_media_playlists/2), + {:ok, new_video} <- insert_merged_video(videos, playlist, media_playlist) do + + ids = Enum.map(videos, &(&1.id)) + Repo.update_all( + from(v in Video, where: v.id in ^ids), + set: [ + visibility: :unlisted, + deleted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) + ] + ) + + {:ok, new_video} + end + end + def get_playlist(video) do {:ok, resp} = get(video.url) {:ok, playlist} = ExM3U8.deserialize_playlist(resp.body, []) diff --git a/lib/algora/admin/generate_blurb_thumbnails.ex b/lib/algora/admin/generate_blurb_thumbnails.ex index dad6e26..9d35589 100644 --- a/lib/algora/admin/generate_blurb_thumbnails.ex +++ b/lib/algora/admin/generate_blurb_thumbnails.ex @@ -1,7 +1,7 @@ defmodule Algora.Admin.GenerateBlurbThumbnails do import Ecto.Query alias Algora.Ads.ProductReview - alias Algora.{Clipper, Repo} + alias Algora.{Clipper, Repo, Storage} require Logger def run do @@ -111,8 +111,8 @@ defmodule Algora.Admin.GenerateBlurbThumbnails do output_path = Path.join(temp_dir, "output.mp4") - init_url = maybe_to_absolute(init_tag.uri, video) - segment_url = maybe_to_absolute(segment_tag.uri, video) + init_url = Storage.to_absolute(:video, video.uuid, init_tag.uri) + segment_url = Storage.to_absolute(:video, video.uuid, segment_tag.uri) init_path = download_file(init_url, Path.join(temp_dir, "init.mp4")) segment_path = download_file(segment_url, Path.join(temp_dir, "segment.m4s")) @@ -128,12 +128,4 @@ defmodule Algora.Admin.GenerateBlurbThumbnails do File.write!(path, body) path end - - defp maybe_to_absolute(uri, video) do - if URI.parse(uri).scheme do - uri - else - Algora.Clipper.to_absolute(:video, video.uuid, uri) - end - end end diff --git a/lib/algora/clipper.ex b/lib/algora/clipper.ex index 72147a6..5961068 100644 --- a/lib/algora/clipper.ex +++ b/lib/algora/clipper.ex @@ -1,14 +1,6 @@ defmodule Algora.Clipper do alias Algora.{Storage, Library} - defp bucket(), do: Algora.config([:buckets, :media]) - - def to_absolute(:video, uuid, uri), - do: "#{Storage.endpoint_url()}/#{bucket()}/#{uuid}/#{uri}" - - def to_absolute(:clip, uuid, uri), - do: "#{Storage.endpoint_url()}/#{bucket()}/clips/#{uuid}/#{uri}" - def clip(video, from, to) do playlists = Algora.Admin.get_media_playlists(video) @@ -20,7 +12,7 @@ defmodule Algora.Clipper do %{ acc | timeline: [ - %ExM3U8.Tags.MediaInit{uri: to_absolute(:video, video.uuid, uri)} | acc.timeline + %ExM3U8.Tags.MediaInit{uri: Storage.to_absolute(:video, video.uuid, uri)} | acc.timeline ] } @@ -39,7 +31,7 @@ defmodule Algora.Clipper do timeline: [ %ExM3U8.Tags.Segment{ duration: duration, - uri: to_absolute(:video, video.uuid, uri) + uri: Storage.to_absolute(:video, video.uuid, uri) } | acc.timeline ] @@ -52,7 +44,7 @@ defmodule Algora.Clipper do timeline: [ %ExM3U8.Tags.Segment{ duration: duration, - uri: to_absolute(:video, video.uuid, uri) + uri: Storage.to_absolute(:video, video.uuid, uri) } | acc.timeline ] @@ -81,14 +73,14 @@ defmodule Algora.Clipper do {:ok, _} = ExAws.S3.put_object_copy( - bucket(), + Storage.bucket(), "clips/#{uuid}/index.m3u8", - bucket(), + Storage.bucket(), "#{video.uuid}/index.m3u8" ) |> ExAws.request() - url = to_absolute(:clip, uuid, "index.m3u8") + url = Storage.to_absolute(:clip, uuid, "index.m3u8") filename = Slug.slugify("#{video.title}-#{Library.to_hhmmss(from)}-#{Library.to_hhmmss(to)}") "ffmpeg -i \"#{url}\" -ss #{ss} -t #{to - from} \"#{filename}.mp4\"" diff --git a/lib/algora/storage.ex b/lib/algora/storage.ex index c60ba0f..dd57325 100644 --- a/lib/algora/storage.ex +++ b/lib/algora/storage.ex @@ -4,6 +4,22 @@ defmodule Algora.Storage do "#{scheme}#{host}" end + def bucket(), do: Algora.config([:buckets, :media]) + + def to_absolute(type, uuid, uri) do + if URI.parse(uri).scheme do + uri + else + to_absolute_uri(type, uuid, uri) + end + end + + defp to_absolute_uri(:video, uuid, uri), + do: "#{endpoint_url()}/#{bucket()}/#{uuid}/#{uri}" + + defp to_absolute_uri(:clip, uuid, uri), + do: "#{endpoint_url()}/#{bucket()}/clips/#{uuid}/#{uri}" + def upload_to_bucket(contents, remote_path, bucket, opts \\ []) do op = Algora.config([:buckets, bucket]) |> ExAws.S3.put_object(remote_path, contents, opts) ExAws.request(op, [])