mirror of
https://github.com/algora-io/tv.git
synced 2025-02-14 01:59:50 +02:00
Revert "Multple manifest HLS with low-latency support"
This reverts commit 0e47ecd6e1b056ac2dc9e09c72998ec07427b4eb.
This commit is contained in:
parent
c7787ccbc8
commit
3b1ed11090
@ -133,7 +133,6 @@ AWS_REGION="auto"
|
||||
AWS_ACCESS_KEY_ID="tid_..."
|
||||
AWS_SECRET_ACCESS_KEY="tsec_..."
|
||||
BUCKET_MEDIA="..."
|
||||
TRANSCODE=4320p60|2160p60|1440p60|1440p30|720p30|360p30|180p30
|
||||
```
|
||||
|
||||
<!-- ARCHITECTURE -->
|
||||
|
@ -249,12 +249,6 @@ const Hooks = {
|
||||
provider.library = HLS;
|
||||
provider.config = {
|
||||
targetlatency: 6, // one segment
|
||||
capLevelToPlayerSize: true,
|
||||
capLevelOnFPSDrop: false,
|
||||
startFragPrefetch: true,
|
||||
startLevel: -1,
|
||||
testBandwidth: !opts.is_live,
|
||||
lowLatencyMode: opts.is_live,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
@ -1,9 +1,6 @@
|
||||
import Config
|
||||
|
||||
config :algora,
|
||||
mode: :dev,
|
||||
resume_rtmp: true,
|
||||
transcode: System.get_env("TRANSCODE")
|
||||
config :algora, mode: :dev, resume_rtmp: true
|
||||
|
||||
config :algora, :buckets,
|
||||
media: System.get_env("BUCKET_MEDIA"),
|
||||
|
@ -14,8 +14,7 @@ end
|
||||
|
||||
config :algora,
|
||||
hf_token: System.get_env("HF_TOKEN"),
|
||||
resume_rtmp: true,
|
||||
transcode: System.get_env("TRANSCODE")
|
||||
resume_rtmp: true
|
||||
|
||||
config :replicate,
|
||||
replicate_api_token: System.get_env("REPLICATE_API_TOKEN")
|
||||
|
@ -31,7 +31,7 @@ config :algora, AlgoraWeb.Embed.Endpoint,
|
||||
server: false
|
||||
|
||||
# Print only warnings and errors during test
|
||||
config :logger, level: :warning
|
||||
config :logger, level: :warn
|
||||
|
||||
# Initialize plugs at runtime for faster test compilation
|
||||
config :phoenix, :plug_init_mode, :runtime
|
||||
|
@ -4,6 +4,11 @@ alias Algora.{Accounts, Library, Storage, Repo}
|
||||
alias Algora.Library.Video
|
||||
|
||||
defmodule Algora.Admin do
|
||||
@tracks %{
|
||||
manifest: "index.m3u8",
|
||||
audio: "g3cFYXVkaW8.m3u8",
|
||||
video: "g3cFdmlkZW8.m3u8"
|
||||
}
|
||||
|
||||
def kill_ad_overlay_processes do
|
||||
find_ad_overlay_processes() |> Enum.each(&Process.exit(&1, :kill))
|
||||
@ -54,9 +59,9 @@ defmodule Algora.Admin do
|
||||
Finch.build(:get, url) |> Finch.request(Algora.Finch)
|
||||
end
|
||||
|
||||
defp get_absolute_media_playlist(video, manifest_name) do
|
||||
defp get_absolute_media_playlist(video) do
|
||||
%ExM3U8.MediaPlaylist{timeline: timeline, info: info} =
|
||||
get_media_playlist(video, manifest_name)
|
||||
get_media_playlist(video, @tracks.video)
|
||||
|
||||
timeline =
|
||||
timeline
|
||||
@ -67,6 +72,7 @@ defmodule Algora.Admin do
|
||||
%ExM3U8.Tags.MediaInit{uri: Storage.to_absolute(:video, video.uuid, uri)}
|
||||
| acc
|
||||
]
|
||||
|
||||
%ExM3U8.Tags.Segment{uri: uri, duration: duration} ->
|
||||
[
|
||||
%ExM3U8.Tags.Segment{
|
||||
@ -85,109 +91,113 @@ defmodule Algora.Admin do
|
||||
%ExM3U8.MediaPlaylist{timeline: timeline, info: info}
|
||||
end
|
||||
|
||||
defp merge_media_playlists(videos, playlist) do
|
||||
manifest_names = Enum.map(playlist.items, &Map.get(&1, :uri))
|
||||
merge_media_playlists(videos, videos, nil, manifest_names, [{"index.m3u8", playlist}])
|
||||
end
|
||||
defp merge_media_playlists(video, nil), do: get_absolute_media_playlist(video)
|
||||
|
||||
defp merge_media_playlists(_all, _videos, _playlist, [], acc), do: acc
|
||||
defp merge_media_playlists(all, [], playlist, [manifest_name | manifest_names], acc) do
|
||||
merge_media_playlists(all, all, nil, manifest_names, [{manifest_name, playlist}|acc])
|
||||
end
|
||||
defp merge_media_playlists(all, [video|videos], nil, [manifest_name|_] = manifest_names, acc) do
|
||||
new_playlist = video |> get_absolute_media_playlist(manifest_name)
|
||||
merge_media_playlists(all, videos, new_playlist, manifest_names, acc)
|
||||
end
|
||||
defp merge_media_playlists(all, [video|videos], playlist, [manifest_name|_] = manifest_names, acc) do
|
||||
new_playlist = video |> get_absolute_media_playlist(manifest_name)
|
||||
merge_media_playlists(all, videos, %ExM3U8.MediaPlaylist{
|
||||
defp merge_media_playlists(video, playlist) do
|
||||
new_playlist = video |> get_absolute_media_playlist
|
||||
|
||||
%ExM3U8.MediaPlaylist{
|
||||
playlist
|
||||
| timeline: playlist.timeline ++ [%ExM3U8.Tags.Discontinuity{} | new_playlist.timeline],
|
||||
info: %ExM3U8.MediaPlaylist.Info{
|
||||
playlist.info
|
||||
| target_duration: max(playlist.info.target_duration, new_playlist.info.target_duration)
|
||||
}
|
||||
}, manifest_names, acc)
|
||||
}
|
||||
end
|
||||
|
||||
defp merge_playlists(videos) do
|
||||
example_playlist = videos |> Enum.at(0) |> get_playlist()
|
||||
manifest_names = Enum.map(example_playlist.items, &Map.get(&1, :uri))
|
||||
playlists = Enum.reduce(manifest_names, %{items: []}, fn(manifest_name, acc) ->
|
||||
streams = Enum.map(videos, fn v ->
|
||||
item = get_playlist(v)
|
||||
|> Map.get(:items)
|
||||
|> Enum.find(&match?(^manifest_name, &1.uri))
|
||||
count = get_media_playlist(v, manifest_name)
|
||||
|> then(&Enum.count(&1.timeline, fn
|
||||
%ExM3U8.Tags.Segment{} -> true
|
||||
_ -> false
|
||||
end))
|
||||
|
||||
streams =
|
||||
Enum.map(videos, fn v ->
|
||||
[item | _] = get_playlist(v) |> then(&Map.get(&1, :items))
|
||||
|
||||
count =
|
||||
get_media_playlist(v, @tracks.video)
|
||||
|> then(
|
||||
&Enum.count(&1.timeline, fn
|
||||
%ExM3U8.Tags.Segment{} -> true
|
||||
_ -> false
|
||||
end)
|
||||
)
|
||||
|
||||
{item, count}
|
||||
end)
|
||||
|
||||
{example_stream, _} = streams |> Enum.find(&match?(manifest_name, elem(&1, 0).uri))
|
||||
{example_stream, _} = streams |> Enum.at(0)
|
||||
|
||||
if Enum.all?(streams, fn {x, _} -> example_stream.resolution == x.resolution && example_stream.codecs == x.codecs end) do
|
||||
max_bandwidth = Enum.map(streams, fn {stream, _} -> Map.get(stream, :bandwidth) end) |> Enum.max(&Ratio.gte?/2)
|
||||
avg_bandwidth = streams
|
||||
|> Enum.reduce({0, 0}, fn {s, count}, {avg_sum, count_sum} -> {avg_sum + s.average_bandwidth * count, count_sum + count} end)
|
||||
|> then(fn {avg, count} -> avg / count end )
|
||||
|> Ratio.trunc()
|
||||
if Enum.all?(streams, fn {x, _} ->
|
||||
example_stream.resolution == x.resolution && example_stream.codecs == x.codecs
|
||||
end) do
|
||||
max_bandwidth =
|
||||
Enum.map(streams, fn {stream, _} -> Map.get(stream, :bandwidth) end)
|
||||
|> Enum.max(&Ratio.gte?/2)
|
||||
|
||||
%{example_playlist | items: [
|
||||
%{ example_stream | average_bandwidth: avg_bandwidth, bandwidth: max_bandwidth }
|
||||
| acc.items
|
||||
]}
|
||||
else
|
||||
IO.puts("Codecs or resolutions don't match in manifest #{manifest_name}. Skipping.")
|
||||
acc
|
||||
end
|
||||
end)
|
||||
avg_bandwidth =
|
||||
streams
|
||||
|> Enum.reduce({0, 0}, fn {s, count}, {avg_sum, count_sum} ->
|
||||
{avg_sum + s.average_bandwidth * count, count_sum + count}
|
||||
end)
|
||||
|> then(fn {avg, count} -> avg / count end)
|
||||
|> Ratio.trunc()
|
||||
|
||||
{:ok, playlists}
|
||||
{:ok,
|
||||
%{
|
||||
example_playlist
|
||||
| items: [
|
||||
%{example_stream | average_bandwidth: avg_bandwidth, bandwidth: max_bandwidth}
|
||||
]
|
||||
}}
|
||||
else
|
||||
{:error, "Codecs or resolutions don't match"}
|
||||
end
|
||||
end
|
||||
|
||||
defp insert_merged_video(videos) do
|
||||
defp insert_merged_video(videos, playlist, media_playlist) do
|
||||
[video | _] = videos
|
||||
|
||||
duration = videos |> Enum.reduce(0, fn v, d -> d + v.duration end)
|
||||
%{video | duration: duration, id: nil, filename: nil}
|
||||
|
||||
result =
|
||||
%{video | duration: duration, id: nil, filename: nil}
|
||||
|> change()
|
||||
|> Video.put_video_url(:vod, video.format)
|
||||
|> Video.put_video_url(video.type, video.format)
|
||||
|> Repo.insert()
|
||||
end
|
||||
|
||||
def upload_merged_streams(video, playlists) do
|
||||
upload_to = fn uuid, manifest_name, content -> Storage.upload(
|
||||
content,
|
||||
"#{uuid}/#{manifest_name}",
|
||||
content_type: "application/x-mpegURL"
|
||||
) end
|
||||
upload_to = fn uuid, track_atom, content ->
|
||||
Storage.upload(
|
||||
content,
|
||||
"#{uuid}/#{@tracks[track_atom]}",
|
||||
content_type: "application/x-mpegURL"
|
||||
)
|
||||
end
|
||||
|
||||
Enum.all? playlists, fn
|
||||
({"index.m3u8" = manifest_name, playlist}) ->
|
||||
manifest = ExM3U8.serialize(playlist)
|
||||
match?({:ok, _}, upload_to.(video.uuid, manifest_name, manifest))
|
||||
({manifest_name, playlist}) ->
|
||||
manifest = "#{ExM3U8.serialize(playlist)}#EXT-X-ENDLIST\n"
|
||||
match?({:ok, _}, upload_to.(video.uuid, manifest_name, manifest))
|
||||
with {:ok, new_video} <- result,
|
||||
{:ok, _} <- upload_to.(new_video.uuid, :manifest, ExM3U8.serialize(playlist)),
|
||||
{:ok, _} <-
|
||||
upload_to.(
|
||||
new_video.uuid,
|
||||
:video,
|
||||
"#{ExM3U8.serialize(media_playlist)}#EXT-X-ENDLIST\n"
|
||||
) do
|
||||
result
|
||||
end
|
||||
end
|
||||
|
||||
def merge_streams(videos) do
|
||||
with {:ok, playlist} <- merge_playlists(videos),
|
||||
media_playlists <- merge_media_playlists(videos, playlist),
|
||||
{:ok, new_video} <- insert_merged_video(videos),
|
||||
true <- upload_merged_streams(new_video, media_playlists) do
|
||||
ids = Enum.map(videos, &(&1.id))
|
||||
Repo.update_all(
|
||||
from(v in Video, where: v.id in ^ids),
|
||||
set: [
|
||||
visibility: :unlisted,
|
||||
deleted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
|
||||
]
|
||||
)
|
||||
media_playlist <- Enum.reduce(videos, nil, &merge_media_playlists/2),
|
||||
{:ok, new_video} <- insert_merged_video(videos, playlist, media_playlist) do
|
||||
ids = Enum.map(videos, & &1.id)
|
||||
|
||||
Repo.update_all(
|
||||
from(v in Video, where: v.id in ^ids),
|
||||
set: [
|
||||
visibility: :unlisted,
|
||||
deleted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
|
||||
]
|
||||
)
|
||||
|
||||
{:ok, new_video}
|
||||
end
|
||||
@ -200,7 +210,9 @@ defmodule Algora.Admin do
|
||||
end
|
||||
|
||||
def get_media_playlist(video, uri) do
|
||||
with {:ok, resp} <- get(get_playlist_url(video, uri)),
|
||||
url = "#{video.url_root}/#{uri}"
|
||||
|
||||
with {:ok, resp} <- get(url),
|
||||
{:ok, playlist} <- ExM3U8.deserialize_media_playlist(resp.body, []) do
|
||||
playlist
|
||||
else
|
||||
@ -208,19 +220,11 @@ defmodule Algora.Admin do
|
||||
end
|
||||
end
|
||||
|
||||
def get_playlist_url(video, %ExM3U8.Tags.Stream{ uri: uri }), do:
|
||||
get_playlist_url(video, uri)
|
||||
def get_playlist_url(video, uri) do
|
||||
String.replace_suffix(video.url, "index.m3u8", uri )
|
||||
end
|
||||
|
||||
def get_media_playlists(video) do
|
||||
with {:ok, master_resp} <- get(video.url),
|
||||
{:ok, master_playlist} <- ExM3U8.deserialize_multivariant_playlist(master_resp.body, []) do
|
||||
Enum.reduce(master_playlist.items, %{}, fn(tag, acc) ->
|
||||
Map.put(acc, tag.uri, get_media_playlist(video, tag))
|
||||
end)
|
||||
end
|
||||
%{
|
||||
video: get_media_playlist(video, @tracks.video),
|
||||
audio: get_media_playlist(video, @tracks.audio)
|
||||
}
|
||||
end
|
||||
|
||||
def set_thumbnail!(id, path \\ nil) do
|
||||
@ -257,11 +261,12 @@ defmodule Algora.Admin do
|
||||
Enum.with_index(chunks),
|
||||
fn {chunk, i} ->
|
||||
IO.puts("#{rounded(100 * i / length(chunks))}%")
|
||||
name = URI.parse(chunk).path |> String.split("/") |> List.last()
|
||||
dl_path = "#{dir}/#{i}-#{name}"
|
||||
|
||||
dl_path = "#{dir}/#{chunk}"
|
||||
|
||||
if not File.exists?(dl_path) do
|
||||
{:ok, :saved_to_file} =
|
||||
:httpc.request(:get, {chunk, []}, [],
|
||||
:httpc.request(:get, {~c"#{video.url_root}/#{chunk}", []}, [],
|
||||
stream: ~c"#{dl_path}.part"
|
||||
)
|
||||
|
||||
@ -277,21 +282,27 @@ defmodule Algora.Admin do
|
||||
|
||||
def download_video(video, dir) do
|
||||
playlists = get_media_playlists(video)
|
||||
timeline = playlists |> Map.values() |> List.first() |> Map.get(:timeline)
|
||||
video_chunks =
|
||||
for n <- timeline,
|
||||
Map.has_key?(n, :uri),
|
||||
do: n.uri
|
||||
|
||||
{time, _} = :timer.tc(&download_chunks/3, [video, video_chunks, dir])
|
||||
video_chunks =
|
||||
for n <- playlists.video.timeline,
|
||||
Map.has_key?(n, :uri),
|
||||
do: n.uri
|
||||
|
||||
audio_chunks =
|
||||
for n <- playlists.audio.timeline,
|
||||
Map.has_key?(n, :uri),
|
||||
do: n.uri
|
||||
|
||||
{time, _} = :timer.tc(&download_chunks/3, [video, video_chunks ++ audio_chunks, dir])
|
||||
|
||||
video_chunks
|
||||
|> Enum.with_index() |> Enum.map(fn {chunk, i} ->
|
||||
name = URI.parse(chunk).path |> String.split("/") |> List.last()
|
||||
"#{dir}/#{i}-#{name}"
|
||||
end)
|
||||
|> Enum.map(fn chunk -> "#{dir}/#{chunk}" end)
|
||||
|> concatenate_files("#{dir}/video.mp4")
|
||||
|
||||
audio_chunks
|
||||
|> Enum.map(fn chunk -> "#{dir}/#{chunk}" end)
|
||||
|> concatenate_files("#{dir}/audio.mp4")
|
||||
|
||||
{_, 0} =
|
||||
System.cmd(
|
||||
"ffmpeg",
|
||||
@ -299,6 +310,8 @@ defmodule Algora.Admin do
|
||||
"-y",
|
||||
"-i",
|
||||
"#{dir}/video.mp4",
|
||||
"-i",
|
||||
"#{dir}/audio.mp4",
|
||||
"-c",
|
||||
"copy",
|
||||
"#{dir}/output.mp4"
|
||||
@ -308,7 +321,7 @@ defmodule Algora.Admin do
|
||||
%File.Stat{size: size} = File.stat!("#{dir}/output.mp4")
|
||||
|
||||
IO.puts(
|
||||
"Downloaded #{dir}/output.mp4 (#{rounded(size / 1_000_000)} MB) in #{rounded(time / 1_000_000)} s (#{rounded(size / time)} MB/s)"
|
||||
"Downloaded #{rounded(size / 1_000_000)} MB in #{rounded(time / 1_000_000)} s (#{rounded(size / time)} MB/s)"
|
||||
)
|
||||
end
|
||||
|
||||
|
@ -24,8 +24,6 @@ defmodule Algora.Application do
|
||||
Algora.Env,
|
||||
{Cluster.Supervisor, [topologies, [name: Algora.ClusterSupervisor]]},
|
||||
{Task.Supervisor, name: Algora.TaskSupervisor},
|
||||
# Start the supervisor for tracking manifest uploads
|
||||
{DynamicSupervisor, strategy: :one_for_one, name: Algora.Pipeline.Storage.ManifestSupervisor},
|
||||
# Start the RPC server
|
||||
{Fly.RPC, []},
|
||||
# Start the Ecto repository
|
||||
|
@ -2,48 +2,10 @@ defmodule Algora.Clipper do
|
||||
alias Algora.{Storage, Library}
|
||||
|
||||
def clip(video, from, to) do
|
||||
clip_timelines(video, from, to) |> List.first() |> elem(0)
|
||||
end
|
||||
|
||||
def create_clip(video, from, to) do
|
||||
uuid = Ecto.UUID.generate()
|
||||
|
||||
[{_manifet_name, %{ss: ss}}|_] = playlists = clip_timelines(video, from, to)
|
||||
|
||||
Enum.each(playlists, fn({manifest_name, %{playlist: playlist, ss: ss}}) ->
|
||||
manifest = "#{ExM3U8.serialize(playlist)}#EXT-X-ENDLIST\n"
|
||||
|
||||
{:ok, _} =
|
||||
Storage.upload(manifest, "clips/#{uuid}/#{manifest_name}",
|
||||
content_type: "application/x-mpegURL"
|
||||
)
|
||||
end)
|
||||
|
||||
{:ok, _} =
|
||||
ExAws.S3.put_object_copy(
|
||||
Storage.bucket(),
|
||||
"clips/#{uuid}/index.m3u8",
|
||||
Storage.bucket(),
|
||||
"#{video.uuid}/index.m3u8"
|
||||
)
|
||||
|> ExAws.request()
|
||||
|
||||
url = Storage.to_absolute(:clip, uuid, "index.m3u8")
|
||||
filename = Slug.slugify("#{video.title}-#{Library.to_hhmmss(from)}-#{Library.to_hhmmss(to)}")
|
||||
|
||||
"ffmpeg -i \"#{url}\" -ss #{ss} -t #{to - from} \"#{filename}.mp4\""
|
||||
end
|
||||
|
||||
def clip_timelines(video, from, to) do
|
||||
playlists = Algora.Admin.get_media_playlists(video)
|
||||
Enum.map(playlists, fn({manifest_name, playlist}) ->
|
||||
{manifest_name, clip_timeline(video, playlist, from, to)}
|
||||
end)
|
||||
end
|
||||
|
||||
def clip_timeline(video, playlist, from, to) do
|
||||
%{timeline: timeline, ss: ss} =
|
||||
playlist.timeline
|
||||
playlists.video.timeline
|
||||
|> Enum.reduce(%{elapsed: 0, ss: 0, timeline: []}, fn x, acc ->
|
||||
case x do
|
||||
%ExM3U8.Tags.MediaInit{uri: uri} ->
|
||||
@ -95,7 +57,7 @@ defmodule Algora.Clipper do
|
||||
end)
|
||||
|> then(fn clip -> %{ss: clip.ss, timeline: Enum.reverse(clip.timeline)} end)
|
||||
|
||||
%{playlist: %{playlist | timeline: timeline}, ss: ss}
|
||||
%{playlist: %{playlists.video | timeline: timeline}, ss: ss}
|
||||
end
|
||||
|
||||
def trim_manifest(video, from, to) do
|
||||
|
@ -1,9 +1,7 @@
|
||||
defmodule Algora.Pipeline do
|
||||
use Membrane.Pipeline
|
||||
require Membrane.Logger
|
||||
|
||||
alias Membrane.Time
|
||||
alias Membrane.RTMP.Messages
|
||||
|
||||
alias Algora.{Admin, Library}
|
||||
alias Algora.Pipeline.HLS.LLController
|
||||
@ -15,7 +13,6 @@ defmodule Algora.Pipeline do
|
||||
@app "live"
|
||||
@terminate_after 60_000 * 60
|
||||
@reconnect_inactivity_timeout 12_000
|
||||
@frame_devisor 1
|
||||
|
||||
defstruct [
|
||||
client_ref: nil,
|
||||
@ -25,8 +22,7 @@ defmodule Algora.Pipeline do
|
||||
dir: nil,
|
||||
reconnect: 0,
|
||||
terminate_timer: nil,
|
||||
waiting_activity: true,
|
||||
data_frame: nil
|
||||
waiting_activity: true
|
||||
]
|
||||
|
||||
def segment_duration(), do: @segment_duration_seconds
|
||||
@ -42,7 +38,7 @@ defmodule Algora.Pipeline do
|
||||
video_uuid: nil
|
||||
}
|
||||
|
||||
{:ok, pid} = with true <- Algora.config([:resume_rtmp]),
|
||||
{:ok, _pid} = with true <- Algora.config([:resume_rtmp]),
|
||||
[{pid, {:pipeline, video_uuid}}] <- Registry.lookup(Algora.Pipeline.Registry, stream_key) do
|
||||
Algora.Pipeline.resume_rtmp(pid, %{params | video_uuid: video_uuid})
|
||||
{:ok, pid}
|
||||
@ -50,12 +46,10 @@ defmodule Algora.Pipeline do
|
||||
_ ->
|
||||
{:ok, _sup, pid} =
|
||||
Membrane.Pipeline.start_link(Algora.Pipeline, params)
|
||||
Process.unlink(pid)
|
||||
Process.flag(:trap_exit, true)
|
||||
{:ok, pid}
|
||||
end
|
||||
|
||||
{Algora.Pipeline.ClientHandler, %{}, pid}
|
||||
{Algora.Pipeline.ClientHandler, %{}}
|
||||
end
|
||||
|
||||
def resume_rtmp(pipeline, params) when is_pid(pipeline) do
|
||||
@ -166,17 +160,9 @@ defmodule Algora.Pipeline do
|
||||
state = terminate_later(state)
|
||||
{[notify_child: {:sink, :disconnected}], state}
|
||||
end
|
||||
|
||||
def handle_child_notification(:start_of_stream, _element, _ctx, state) do
|
||||
{[], state}
|
||||
end
|
||||
|
||||
def handle_child_notification(:start_of_stream, _element, _ctx, state) do
|
||||
{[], state}
|
||||
end
|
||||
|
||||
def handle_child_notification(message, _element, _ctx, state) do
|
||||
Membrane.Logger.info("Unhandled child notification #{inspect(message)}")
|
||||
Membrane.Logger.debug("Unhandled child notificaiton #{inspect(message)}")
|
||||
{[], state}
|
||||
end
|
||||
|
||||
@ -225,6 +211,8 @@ defmodule Algora.Pipeline do
|
||||
}
|
||||
end
|
||||
|
||||
|
||||
@impl true
|
||||
def handle_info(:setup_output, _ctx, %{video: video, dir: dir, reconnect: reconnect} = state) do
|
||||
structure = [
|
||||
#
|
||||
@ -286,61 +274,9 @@ defmodule Algora.Pipeline do
|
||||
def handle_info(:stream_interupted, _ctx, state) do
|
||||
Membrane.Logger.info("Stream interupted #{inspect(state)}")
|
||||
Algora.Library.toggle_streamer_live(state.video, false, true)
|
||||
end
|
||||
|
||||
def handle_info(%Membrane.RTMP.Messages.SetDataFrame{} = _message, _ctx, %{playing: true} = state) do
|
||||
{[], state}
|
||||
end
|
||||
|
||||
def handle_info(%Membrane.RTMP.Messages.SetDataFrame{} = message, _ctx, state) do
|
||||
%{ height: source_height, width: source_width, framerate: source_framerate } = message
|
||||
|
||||
if transcode_slug = Algora.config([:transcode]) do
|
||||
structure = transcode_slug
|
||||
|> String.split("|")
|
||||
|> Enum.map(&String.split(&1, "p"))
|
||||
|> Enum.map(fn([h, f]) -> {String.to_integer(h), String.to_integer(f)} end)
|
||||
|> Enum.filter(fn({target_height, framerate}) ->
|
||||
target_height <= source_height and framerate <= source_framerate
|
||||
end)
|
||||
|> Enum.map(fn({target_height, framerate}) ->
|
||||
height = normalize_scale(target_height)
|
||||
width = normalize_scale(source_width / (source_height / target_height))
|
||||
track_name = "video_#{width}x#{height}p#{framerate}"
|
||||
#
|
||||
get_child(:tee_video)
|
||||
|> child(%Membrane.H264.Parser{
|
||||
output_stream_structure: :annexb,
|
||||
generate_best_effort_timestamps: %{framerate: {trunc(source_framerate), @frame_devisor}}
|
||||
})
|
||||
|> child(Membrane.H264.FFmpeg.Decoder)
|
||||
|> child(%Membrane.FramerateConverter{framerate: {trunc(framerate), @frame_devisor}})
|
||||
|> child(%Membrane.FFmpeg.SWScale.Scaler{
|
||||
output_height: height,
|
||||
output_width: width
|
||||
})
|
||||
|> child(%Membrane.H264.FFmpeg.Encoder{
|
||||
preset: :ultrafast,
|
||||
gop_size: trunc(framerate * 2),
|
||||
})
|
||||
|> via_in(Pad.ref(:input, track_name),
|
||||
options: [
|
||||
track_name: track_name,
|
||||
encoding: :H264,
|
||||
segment_duration: @segment_duration,
|
||||
partial_segment_duration: @partial_segment_duration
|
||||
]
|
||||
)
|
||||
|> get_child(:sink)
|
||||
end)
|
||||
|
||||
spec = {structure, group: :hls_adaptive}
|
||||
{[spec: spec], state}
|
||||
else
|
||||
{[], state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info({:forward_rtmp, url, ref}, _ctx, state) do
|
||||
spec = [
|
||||
#
|
||||
@ -392,15 +328,12 @@ defmodule Algora.Pipeline do
|
||||
|
||||
def handle_info(:reconnect_inactivity, _ctx, %{ waiting_activity: true } = state) do
|
||||
Membrane.Logger.error("Tried to reconnect but failed #{inspect(state)}")
|
||||
send(self(), :terminate)
|
||||
{[], state}
|
||||
end
|
||||
|
||||
def handle_info(:reconnect_inactivity, _ctx, state) do
|
||||
{[], state}
|
||||
end
|
||||
|
||||
def handle_info(:stream_interupted, _ctx, state) do
|
||||
send(self(), :terminate)
|
||||
|
||||
{[], state}
|
||||
end
|
||||
def handle_info(:reconnect_inactivity, _ctx, state) do
|
||||
{[], state}
|
||||
end
|
||||
|
||||
@ -409,10 +342,6 @@ defmodule Algora.Pipeline do
|
||||
{[terminate: :normal], state}
|
||||
end
|
||||
|
||||
def handle_info(message, _ctx, state) do
|
||||
Membrane.Logger.info("Unhandled notification #{inspect(message)}")
|
||||
{[], state}
|
||||
end
|
||||
|
||||
defp terminate_later(%{terminate_timer: nil} = state) do
|
||||
time = if Algora.config([:resume_rtmp]), do: @terminate_after, else: 0
|
||||
@ -428,11 +357,4 @@ defmodule Algora.Pipeline do
|
||||
:timer.cancel(timer)
|
||||
%{ state | terminate_timer: nil }
|
||||
end
|
||||
|
||||
defp normalize_scale(scale) when is_float(scale), do:
|
||||
scale |> trunc() |> normalize_scale()
|
||||
defp normalize_scale(scale) when is_integer(scale) and scale > 0 do
|
||||
if rem(scale, 2) == 1, do: scale - 1, else: scale
|
||||
end
|
||||
|
||||
end
|
||||
|
@ -54,24 +54,24 @@ defmodule Algora.Pipeline.HLS.EtsHelper do
|
||||
### ETS CONTENT MANAGMENT
|
||||
###
|
||||
|
||||
@spec update_manifest(:ets.table(), String.t(), String.t()) :: true
|
||||
def update_manifest(table, manifest, manifest_name) do
|
||||
:ets.insert(table, {manifest_key(manifest_name), manifest})
|
||||
@spec update_manifest(:ets.table(), String.t()) :: true
|
||||
def update_manifest(table, manifest) do
|
||||
:ets.insert(table, {@manifest_key, manifest})
|
||||
end
|
||||
|
||||
@spec update_delta_manifest(:ets.table(), String.t(), String.t()) :: true
|
||||
def update_delta_manifest(table, delta_manifest, manifest_name) do
|
||||
:ets.insert(table, {delta_manifest_key(manifest_name), delta_manifest})
|
||||
@spec update_delta_manifest(:ets.table(), String.t()) :: true
|
||||
def update_delta_manifest(table, delta_manifest) do
|
||||
:ets.insert(table, {@delta_manifest_key, delta_manifest})
|
||||
end
|
||||
|
||||
@spec update_recent_partial(:ets.table(), partial(), String.t()) :: true
|
||||
def update_recent_partial(table, partial, manifest_name) do
|
||||
:ets.insert(table, {recent_partial_key(manifest_name), partial})
|
||||
@spec update_recent_partial(:ets.table(), partial()) :: true
|
||||
def update_recent_partial(table, partial) do
|
||||
:ets.insert(table, {@recent_partial_key, partial})
|
||||
end
|
||||
|
||||
@spec update_delta_recent_partial(:ets.table(), partial(), String.t()) :: true
|
||||
def update_delta_recent_partial(table, partial, manifest_name) do
|
||||
:ets.insert(table, {delta_recent_partial_key(manifest_name), partial})
|
||||
@spec update_delta_recent_partial(:ets.table(), partial()) :: true
|
||||
def update_delta_recent_partial(table, partial) do
|
||||
:ets.insert(table, {@delta_recent_partial_key, partial})
|
||||
end
|
||||
|
||||
@spec add_partial(:ets.table(), binary(), String.t()) :: true
|
||||
@ -104,26 +104,26 @@ defmodule Algora.Pipeline.HLS.EtsHelper do
|
||||
get_from_ets(video_uuid, filename)
|
||||
end
|
||||
|
||||
@spec get_recent_partial(Video.uuid(), String.t()) ::
|
||||
@spec get_recent_partial(Video.uuid()) ::
|
||||
{:ok, {non_neg_integer(), non_neg_integer()}} | {:error, atom()}
|
||||
def get_recent_partial(video_uuid, manifest_name) do
|
||||
get_from_ets(video_uuid, recent_partial_key(manifest_name))
|
||||
def get_recent_partial(video_uuid) do
|
||||
get_from_ets(video_uuid, @recent_partial_key)
|
||||
end
|
||||
|
||||
@spec get_delta_recent_partial(Video.uuid(), String.t()) ::
|
||||
@spec get_delta_recent_partial(Video.uuid()) ::
|
||||
{:ok, {non_neg_integer(), non_neg_integer()}} | {:error, atom()}
|
||||
def get_delta_recent_partial(video_uuid, manifest_name) do
|
||||
get_from_ets(video_uuid, delta_recent_partial_key(manifest_name))
|
||||
def get_delta_recent_partial(video_uuid) do
|
||||
get_from_ets(video_uuid, @delta_recent_partial_key)
|
||||
end
|
||||
|
||||
@spec get_manifest(Video.uuid(), String.t()) :: {:ok, String.t()} | {:error, atom()}
|
||||
def get_manifest(video_uuid, manifest_name) do
|
||||
get_from_ets(video_uuid, manifest_key(manifest_name))
|
||||
@spec get_manifest(Video.uuid()) :: {:ok, String.t()} | {:error, atom()}
|
||||
def get_manifest(video_uuid) do
|
||||
get_from_ets(video_uuid, @manifest_key)
|
||||
end
|
||||
|
||||
@spec get_delta_manifest(Video.uuid(), String.t()) :: {:ok, String.t()} | {:error, atom()}
|
||||
def get_delta_manifest(video_uuid, manifest_name) do
|
||||
get_from_ets(video_uuid, delta_manifest_key(manifest_name))
|
||||
@spec get_delta_manifest(Video.uuid()) :: {:ok, String.t()} | {:error, atom()}
|
||||
def get_delta_manifest(video_uuid) do
|
||||
get_from_ets(video_uuid, @delta_manifest_key)
|
||||
end
|
||||
|
||||
@spec get_hls_folder_path(Video.uuid()) :: {:ok, String.t()} | {:error, :video_not_found}
|
||||
@ -163,20 +163,4 @@ defmodule Algora.Pipeline.HLS.EtsHelper do
|
||||
defp video_exists?(video_uuid) do
|
||||
:ets.lookup(@videos_to_tables, video_uuid) != []
|
||||
end
|
||||
|
||||
defp manifest_key(manifest_name) when is_binary(manifest_name) do
|
||||
"#{@manifest_key}_#{manifest_name}"
|
||||
end
|
||||
|
||||
defp delta_manifest_key(manifest_name) when is_binary(manifest_name) do
|
||||
"#{@delta_manifest_key}_#{manifest_name}"
|
||||
end
|
||||
|
||||
defp recent_partial_key(manifest_name) when is_binary(manifest_name) do
|
||||
"#{@recent_partial_key}_#{manifest_name}"
|
||||
end
|
||||
|
||||
defp delta_recent_partial_key(manifest_name) when is_binary(manifest_name) do
|
||||
"#{@delta_recent_partial_key}_#{manifest_name}"
|
||||
end
|
||||
end
|
||||
|
@ -12,9 +12,9 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
defstruct @enforce_keys ++
|
||||
[
|
||||
table: nil,
|
||||
preload_hints: %{},
|
||||
manifest: %{},
|
||||
delta_manifest: %{}
|
||||
preload_hints: [],
|
||||
manifest: %{waiting_pids: %{}, last_partial: nil},
|
||||
delta_manifest: %{waiting_pids: %{}, last_partial: nil}
|
||||
]
|
||||
|
||||
@type segment_sn :: non_neg_integer()
|
||||
@ -27,14 +27,11 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
directory: Path.t(),
|
||||
video_pid: pid(),
|
||||
table: :ets.table() | nil,
|
||||
manifest: %{ String.t() => status() },
|
||||
delta_manifest: %{ String.t() => status() },
|
||||
preload_hints: %{String.t() => [pid()]}
|
||||
manifest: status(),
|
||||
delta_manifest: status(),
|
||||
preload_hints: [pid()]
|
||||
}
|
||||
|
||||
@default_status %{waiting_pids: %{}, last_partial: nil}
|
||||
@segment_suffix_regex ~r/(_\d*_part)?\.m4s$/
|
||||
|
||||
###
|
||||
### HLS Controller API
|
||||
###
|
||||
@ -89,30 +86,30 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
@doc """
|
||||
Handles manifest requests with specific partial requested (ll-hls)
|
||||
"""
|
||||
@spec handle_manifest_request(Video.uuid(), partial(), String.t()) ::
|
||||
@spec handle_manifest_request(Video.uuid(), partial()) ::
|
||||
{:ok, String.t()} | {:error, atom()}
|
||||
def handle_manifest_request(video_uuid, partial, filename) do
|
||||
with {:ok, last_partial} <- EtsHelper.get_recent_partial(video_uuid, filename) do
|
||||
def handle_manifest_request(video_uuid, partial) do
|
||||
with {:ok, last_partial} <- EtsHelper.get_recent_partial(video_uuid) do
|
||||
unless partial_ready?(partial, last_partial) do
|
||||
wait_for_manifest_ready(video_uuid, partial, :manifest, filename)
|
||||
wait_for_manifest_ready(video_uuid, partial, :manifest)
|
||||
end
|
||||
|
||||
EtsHelper.get_manifest(video_uuid, filename)
|
||||
EtsHelper.get_manifest(video_uuid)
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Handles delta manifest requests with specific partial requested (ll-hls)
|
||||
"""
|
||||
@spec handle_delta_manifest_request(Video.uuid(), partial(), String.t()) ::
|
||||
@spec handle_delta_manifest_request(Video.uuid(), partial()) ::
|
||||
{:ok, String.t()} | {:error, atom()}
|
||||
def handle_delta_manifest_request(video_uuid, partial, filename) do
|
||||
with {:ok, last_partial} <- EtsHelper.get_delta_recent_partial(video_uuid, filename) do
|
||||
def handle_delta_manifest_request(video_uuid, partial) do
|
||||
with {:ok, last_partial} <- EtsHelper.get_delta_recent_partial(video_uuid) do
|
||||
unless partial_ready?(partial, last_partial) do
|
||||
wait_for_manifest_ready(video_uuid, partial, :delta_manifest, filename)
|
||||
wait_for_manifest_ready(video_uuid, partial, :delta_manifest)
|
||||
end
|
||||
|
||||
EtsHelper.get_delta_manifest(video_uuid, filename)
|
||||
EtsHelper.get_delta_manifest(video_uuid)
|
||||
end
|
||||
end
|
||||
|
||||
@ -120,24 +117,24 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
### STORAGE API
|
||||
###
|
||||
|
||||
@spec update_manifest(Video.uuid(), String.t(), String.t()) :: :ok
|
||||
def update_manifest(video_uuid, manifest, filename) do
|
||||
GenServer.cast(registry_id(video_uuid), {:update_manifest, manifest, filename})
|
||||
@spec update_manifest(Video.uuid(), String.t()) :: :ok
|
||||
def update_manifest(video_uuid, manifest) do
|
||||
GenServer.cast(registry_id(video_uuid), {:update_manifest, manifest})
|
||||
end
|
||||
|
||||
@spec update_delta_manifest(Video.uuid(), String.t(), String.t()) :: :ok
|
||||
def update_delta_manifest(video_uuid, delta_manifest, filename) do
|
||||
GenServer.cast(registry_id(video_uuid), {:update_delta_manifest, delta_manifest, filename})
|
||||
@spec update_delta_manifest(Video.uuid(), String.t()) :: :ok
|
||||
def update_delta_manifest(video_uuid, delta_manifest) do
|
||||
GenServer.cast(registry_id(video_uuid), {:update_delta_manifest, delta_manifest})
|
||||
end
|
||||
|
||||
@spec update_recent_partial(Video.uuid(), partial(), :manifest | :delta_manifest, String.t()) :: :ok
|
||||
def update_recent_partial(video_uuid, last_partial, manifest, filename) do
|
||||
GenServer.cast(registry_id(video_uuid), {:update_recent_partial, last_partial, manifest, filename})
|
||||
@spec update_recent_partial(Video.uuid(), partial(), :manifest | :delta_manifest) :: :ok
|
||||
def update_recent_partial(video_uuid, last_partial, manifest) do
|
||||
GenServer.cast(registry_id(video_uuid), {:update_recent_partial, last_partial, manifest})
|
||||
end
|
||||
|
||||
@spec add_partial(Video.uuid(), binary(), binary(), String.t()) :: :ok
|
||||
def add_partial(video_uuid, segment, partial, filename) do
|
||||
GenServer.cast(registry_id(video_uuid), {:add_partial, segment, partial, filename})
|
||||
@spec add_partial(Video.uuid(), binary(), String.t()) :: :ok
|
||||
def add_partial(video_uuid, partial, filename) do
|
||||
GenServer.cast(registry_id(video_uuid), {:add_partial, partial, filename})
|
||||
end
|
||||
|
||||
@spec delete_partial(Video.uuid(), String.t()) :: :ok
|
||||
@ -149,20 +146,6 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
GenServer.cast(registry_id(video_uuid), {:write_to_file, filename, content})
|
||||
end
|
||||
|
||||
# Filename example: muxed_segment_32_g2QABXZpZGVv_5_part.m4s
|
||||
def get_manifest_name(segment_name) do
|
||||
segment_name
|
||||
|> String.replace(@segment_suffix_regex, "")
|
||||
|> String.split("_")
|
||||
|> Enum.drop_while(fn(s) -> !match?({_integer, ""}, Integer.parse(s)) end)
|
||||
|> Enum.drop(1)
|
||||
|> Enum.join("_")
|
||||
|> then(fn
|
||||
("") -> {:error, :unknown_segment_name_format}
|
||||
(name) -> {:ok, "#{name}.m3u8"}
|
||||
end)
|
||||
end
|
||||
|
||||
###
|
||||
### MANAGEMENT API
|
||||
###
|
||||
@ -205,12 +188,11 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:partial_ready?, partial, from, manifest, filename}, state) do
|
||||
manifests = Map.get(state, manifest)
|
||||
state = manifests
|
||||
|> Map.fetch!(filename)
|
||||
def handle_cast({:partial_ready?, partial, from, manifest}, state) do
|
||||
state =
|
||||
state
|
||||
|> Map.fetch!(manifest)
|
||||
|> handle_partial_ready?(partial, from)
|
||||
|> then(&Map.put(manifests, filename, &1))
|
||||
|> then(&Map.put(state, manifest, &1))
|
||||
|
||||
{:noreply, state}
|
||||
@ -223,65 +205,45 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
{:noreply, state}
|
||||
else
|
||||
{:error, _reason} ->
|
||||
preload_hints = if {:ok, manifest_name} = get_manifest_name(filename) do
|
||||
hints_for_file = state
|
||||
|> Map.get(:preload_hints, %{})
|
||||
|> Map.get(filename, [])
|
||||
Map.put(state.preload_hints, manifest_name, [from | hints_for_file])
|
||||
else
|
||||
state.preload_hints
|
||||
end
|
||||
|
||||
{:noreply, %{ state | preload_hints: preload_hints }}
|
||||
{:noreply, %{state | preload_hints: [from | state.preload_hints]}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:update_manifest, manifest, filename}, %{table: table} = state) do
|
||||
EtsHelper.update_manifest(table, manifest, filename)
|
||||
def handle_cast({:update_manifest, manifest}, %{table: table} = state) do
|
||||
EtsHelper.update_manifest(table, manifest)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:update_delta_manifest, delta_manifest, filename}, %{table: table} = state) do
|
||||
EtsHelper.update_delta_manifest(table, delta_manifest, filename)
|
||||
def handle_cast({:update_delta_manifest, delta_manifest}, %{table: table} = state) do
|
||||
EtsHelper.update_delta_manifest(table, delta_manifest)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(
|
||||
{:update_recent_partial, last_partial, manifest, filename},
|
||||
{:update_recent_partial, last_partial, manifest},
|
||||
%{preload_hints: preload_hints, table: table} = state
|
||||
) do
|
||||
|
||||
case manifest do
|
||||
:manifest -> EtsHelper.update_recent_partial(table, last_partial, filename)
|
||||
:delta_manifest -> EtsHelper.update_delta_recent_partial(table, last_partial, filename)
|
||||
:manifest -> EtsHelper.update_recent_partial(table, last_partial)
|
||||
:delta_manifest -> EtsHelper.update_delta_recent_partial(table, last_partial)
|
||||
end
|
||||
|
||||
manifests = Map.fetch!(state, manifest)
|
||||
|
||||
new_manifests = manifests
|
||||
|> Map.get(filename, @default_status)
|
||||
|> update_and_notify_manifest_ready(last_partial)
|
||||
|> then(&Map.put(manifests, filename, &1))
|
||||
|
||||
new_preload_hints = preload_hints
|
||||
|> Map.get(filename, [])
|
||||
|> update_and_notify_preload_hint_ready()
|
||||
|> then(&Map.put(preload_hints, filename, &1))
|
||||
status = Map.fetch!(state, manifest)
|
||||
|
||||
state =
|
||||
state
|
||||
|> Map.put(manifest, new_manifests)
|
||||
|> Map.put(:preload_hints, new_preload_hints)
|
||||
|> Map.put(manifest, update_and_notify_manifest_ready(status, last_partial))
|
||||
|> Map.put(:preload_hints, update_and_notify_preload_hint_ready(preload_hints))
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:add_partial, partial, _segment_name, partial_name}, %{table: table} = state) do
|
||||
EtsHelper.add_partial(table, partial, partial_name)
|
||||
def handle_cast({:add_partial, partial, filename}, %{table: table} = state) do
|
||||
EtsHelper.add_partial(table, partial, filename)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@ -325,8 +287,8 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
### PRIVATE FUNCTIONS
|
||||
###
|
||||
|
||||
defp wait_for_manifest_ready(video_uuid, partial, manifest, filename) do
|
||||
GenServer.cast(registry_id(video_uuid), {:partial_ready?, partial, self(), manifest, filename})
|
||||
defp wait_for_manifest_ready(video_uuid, partial, manifest) do
|
||||
GenServer.cast(registry_id(video_uuid), {:partial_ready?, partial, self(), manifest})
|
||||
|
||||
receive do
|
||||
:manifest_ready ->
|
||||
@ -380,8 +342,7 @@ defmodule Algora.Pipeline.HLS.LLController do
|
||||
defp preload_hint?(video_uuid, filename) do
|
||||
partial_sn = get_partial_sn(filename)
|
||||
|
||||
with {:ok, manifest_name} <- get_manifest_name(filename),
|
||||
{:ok, recent_partial_sn} <- EtsHelper.get_recent_partial(video_uuid, manifest_name) do
|
||||
with {:ok, recent_partial_sn} <- EtsHelper.get_recent_partial(video_uuid) do
|
||||
{:ok, check_if_preload_hint(partial_sn, recent_partial_sn)}
|
||||
end
|
||||
end
|
||||
|
@ -314,7 +314,7 @@ defmodule Algora.Pipeline.Sink do
|
||||
if any_track_persisted? do
|
||||
# reconfigure tracks to disable partial segments on final manifest
|
||||
tracks = Enum.reduce(manifest.tracks, %{}, fn({name, track}, acc) ->
|
||||
Map.put(acc, name, %Track{ track | mode: :vod, partial_segment_duration: nil })
|
||||
Map.put(acc, name, %Track{ track | partial_segment_duration: nil })
|
||||
end)
|
||||
|
||||
{result, storage} =
|
||||
|
@ -115,7 +115,7 @@ defmodule Algora.Pipeline.SourceBin do
|
||||
@impl true
|
||||
def handle_parent_notification(:kill, _ctx, state) do
|
||||
Process.exit(state.client_ref, :kill)
|
||||
{[], state}
|
||||
{[terminate: :kill], state}
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -11,29 +11,28 @@ defmodule Algora.Pipeline.Storage do
|
||||
@enforce_keys [:directory, :video]
|
||||
defstruct @enforce_keys ++
|
||||
[
|
||||
sequences: %{},
|
||||
partials_in_ets: %{},
|
||||
partial_sn: 0,
|
||||
segment_sn: 0,
|
||||
partials_in_ets: [],
|
||||
video_header: <<>>,
|
||||
video_segment: <<>>,
|
||||
setup_completed?: false,
|
||||
manifest_uploader: nil
|
||||
setup_completed?: false
|
||||
]
|
||||
|
||||
@type partial_ets_key :: String.t()
|
||||
@type sequence_number :: non_neg_integer()
|
||||
@type partial_in_ets ::
|
||||
{{segment_sn :: sequence_number(), partial_sn :: sequence_number()}, partial_ets_key()}
|
||||
@type manifest_name :: String.t()
|
||||
|
||||
@type t :: %__MODULE__{
|
||||
directory: Path.t(),
|
||||
video: Library.Video.t(),
|
||||
sequences: %{ manifest_name() => { sequence_number(), sequence_number() }},
|
||||
partials_in_ets: %{ manifest_name() => [partial_in_ets()] },
|
||||
partial_sn: sequence_number(),
|
||||
segment_sn: sequence_number(),
|
||||
partials_in_ets: [partial_in_ets()],
|
||||
video_header: <<>>,
|
||||
video_segment: <<>>,
|
||||
setup_completed?: boolean(),
|
||||
manifest_uploader: pid()
|
||||
setup_completed?: boolean()
|
||||
}
|
||||
|
||||
@ets_cached_duration_in_segments 4
|
||||
@ -41,8 +40,7 @@ defmodule Algora.Pipeline.Storage do
|
||||
|
||||
@impl true
|
||||
def init(state) do
|
||||
{:ok, uploader} = GenServer.start_link(Algora.Pipeline.Storage.Manifest, state.video)
|
||||
Map.put(state, :manifest_uploader, uploader)
|
||||
state
|
||||
end
|
||||
|
||||
@impl true
|
||||
@ -59,7 +57,8 @@ defmodule Algora.Pipeline.Storage do
|
||||
store_content(parent_id, name, content, metadata, context, state)
|
||||
|
||||
%{mode: :text, type: :manifest} ->
|
||||
cache_manifest(name, content, context, state)
|
||||
cache_manifest(name, content, state)
|
||||
store_content(parent_id, name, content, metadata, context, state)
|
||||
end
|
||||
end
|
||||
|
||||
@ -80,13 +79,11 @@ defmodule Algora.Pipeline.Storage do
|
||||
%{sequence_number: sequence_number, partial_name: partial_name} = metadata,
|
||||
ctx,
|
||||
state
|
||||
) do
|
||||
{:ok, manifest_name} = Algora.Pipeline.HLS.LLController.get_manifest_name(name)
|
||||
|
||||
) do
|
||||
state =
|
||||
process_contents(parent_id, name, contents, metadata, ctx, state)
|
||||
|> update_sequence_numbers(sequence_number, manifest_name)
|
||||
|> add_partial_to_ets(name, partial_name, contents, manifest_name)
|
||||
|> update_sequence_numbers(sequence_number)
|
||||
|> add_partial_to_ets(partial_name, contents)
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
@ -94,13 +91,10 @@ defmodule Algora.Pipeline.Storage do
|
||||
defp cache_manifest(
|
||||
filename,
|
||||
content,
|
||||
context,
|
||||
%__MODULE__{video: video, manifest_uploader: uploader} = state
|
||||
%__MODULE__{video: video} = state
|
||||
) do
|
||||
broadcast!(video.uuid, [LLController, :write_to_file, [video.uuid, filename, content]])
|
||||
|
||||
:ok = GenServer.cast(uploader, {:upload, filename, content, upload_opts(context)})
|
||||
|
||||
unless filename == "index.m3u8" do
|
||||
add_manifest_to_ets(filename, content, state)
|
||||
send_update(filename, state)
|
||||
@ -127,64 +121,51 @@ defmodule Algora.Pipeline.Storage do
|
||||
do: :update_delta_manifest,
|
||||
else: :update_manifest
|
||||
),
|
||||
[video.uuid, manifest, filename]
|
||||
[video.uuid, manifest]
|
||||
])
|
||||
end
|
||||
|
||||
defp add_partial_to_ets(
|
||||
%{
|
||||
partials_in_ets: partials_in_ets,
|
||||
sequences: sequences,
|
||||
segment_sn: segment_sn,
|
||||
partial_sn: partial_sn,
|
||||
video: video
|
||||
} = state,
|
||||
segment_name,
|
||||
partial_name,
|
||||
content,
|
||||
manifest_name
|
||||
content
|
||||
) do
|
||||
broadcast!(video.uuid, [LLController, :add_partial, [
|
||||
video.uuid, content, segment_name, partial_name
|
||||
]])
|
||||
broadcast!(video.uuid, [LLController, :add_partial, [video.uuid, content, partial_name]])
|
||||
|
||||
if partial = sequences[manifest_name] do
|
||||
partials = Map.get(partials_in_ets, manifest_name, [])
|
||||
partials_in_ets = Map.put(partials_in_ets, manifest_name, [{partial, partial_name} | partials])
|
||||
%{state | partials_in_ets: partials_in_ets}
|
||||
else
|
||||
state
|
||||
end
|
||||
partial = {segment_sn, partial_sn}
|
||||
%{state | partials_in_ets: [{partial, partial_name} | partials_in_ets]}
|
||||
end
|
||||
|
||||
defp remove_partials_from_ets(
|
||||
%{
|
||||
partials_in_ets: partials_in_ets,
|
||||
sequences: sequences,
|
||||
video: video,
|
||||
} = state,
|
||||
manifest_name
|
||||
segment_sn: curr_segment_sn,
|
||||
video: video
|
||||
} = state
|
||||
) do
|
||||
if { curr_segment_sn, _} = Map.get(sequences, manifest_name) do
|
||||
{partials, partial_to_be_removed} =
|
||||
Enum.split_with(partials_in_ets[manifest_name], fn {{segment_sn, _partial_sn}, _partial_name} ->
|
||||
segment_sn + (@ets_cached_duration_in_segments) > curr_segment_sn
|
||||
end)
|
||||
|
||||
Enum.each(partial_to_be_removed, fn {_sn, partial_name} ->
|
||||
broadcast!(video.uuid, [LLController, :delete_partial, [video.uuid, partial_name]])
|
||||
{partials_in_ets, partial_to_be_removed} =
|
||||
Enum.split_with(partials_in_ets, fn {{segment_sn, _partial_sn}, _partial_name} ->
|
||||
segment_sn + @ets_cached_duration_in_segments > curr_segment_sn
|
||||
end)
|
||||
|
||||
partials_in_ets = Map.put(partials_in_ets, manifest_name, partials)
|
||||
%{state | partials_in_ets: partials_in_ets}
|
||||
else
|
||||
state
|
||||
end
|
||||
Enum.each(partial_to_be_removed, fn {_sn, partial_name} ->
|
||||
broadcast!(video.uuid, [LLController, :delete_partial, [video.uuid, partial_name]])
|
||||
end)
|
||||
|
||||
%{state | partials_in_ets: partials_in_ets}
|
||||
end
|
||||
|
||||
defp broadcast!(video_uuid, msg), do: LLController.broadcast!(video_uuid, msg)
|
||||
|
||||
defp send_update(filename, %{
|
||||
video: video,
|
||||
sequences: sequences
|
||||
segment_sn: segment_sn,
|
||||
partial_sn: partial_sn
|
||||
}) do
|
||||
manifest =
|
||||
if(String.ends_with?(filename, @delta_manifest_suffix),
|
||||
@ -192,34 +173,23 @@ defmodule Algora.Pipeline.Storage do
|
||||
else: :manifest
|
||||
)
|
||||
|
||||
manifest_name = String.replace(filename, @delta_manifest_suffix, ".m3u8")
|
||||
if partial = sequences[manifest_name] do
|
||||
broadcast!(video.uuid, [LLController, :update_recent_partial, [
|
||||
video.uuid, partial, manifest, filename
|
||||
]])
|
||||
end
|
||||
partial = {segment_sn, partial_sn}
|
||||
|
||||
broadcast!(video.uuid, [LLController, :update_recent_partial, [video.uuid, partial, manifest]])
|
||||
end
|
||||
|
||||
defp update_sequence_numbers(
|
||||
%{sequences: sequences} = state,
|
||||
new_partial_sn,
|
||||
manifest_name
|
||||
%{segment_sn: segment_sn, partial_sn: partial_sn} = state,
|
||||
new_partial_sn
|
||||
) do
|
||||
{segment_sn, partial_sn} = Map.get(sequences, manifest_name, {0, 0})
|
||||
new_segment? = new_partial_sn < partial_sn
|
||||
sequence = if new_segment? do
|
||||
{ segment_sn + 1, new_partial_sn }
|
||||
else
|
||||
{ segment_sn, new_partial_sn }
|
||||
end
|
||||
state = sequences
|
||||
|> Map.put(manifest_name, sequence)
|
||||
|> then(&Map.put(state, :sequences, &1))
|
||||
# If there is a new segment we want to remove partials that are too old from ets
|
||||
|
||||
if new_segment? do
|
||||
remove_partials_from_ets(state, manifest_name)
|
||||
state = %{state | segment_sn: segment_sn + 1, partial_sn: new_partial_sn}
|
||||
# If there is a new segment we want to remove partials that are too old from ets
|
||||
remove_partials_from_ets(state)
|
||||
else
|
||||
state
|
||||
%{state | partial_sn: new_partial_sn}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -1,49 +0,0 @@
|
||||
defmodule Algora.Pipeline.Storage.Manifest do
|
||||
use GenServer, restart: :transient
|
||||
|
||||
require Membrane.Logger
|
||||
|
||||
@delta_suffix_regex ~r/_delta.m3u8$/
|
||||
@delay Algora.Pipeline.segment_duration() * 1000
|
||||
|
||||
def start_link([video]) do
|
||||
GenServer.start_link(__MODULE__, video)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(video) do
|
||||
{:ok, %{video: video, timers: %{}}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:upload, name, contents, upload_opts}, %{timers: timers} = state) do
|
||||
if String.match?(name, @delta_suffix_regex) do
|
||||
{:noreply, state}
|
||||
else
|
||||
if timer_ref = timers[name] do
|
||||
{:ok, :cancel} = :timer.cancel(timer_ref)
|
||||
end
|
||||
|
||||
{:ok, timer_ref} = :timer.send_after(@delay, {
|
||||
:upload_immediate, name, contents, upload_opts
|
||||
})
|
||||
|
||||
timers = Map.put(state.timers, name, timer_ref)
|
||||
{:noreply, %{state | timers: timers }}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:upload_immediate, name, contents, upload_opts}, state) do
|
||||
path = "#{state.video.uuid}/#{name}"
|
||||
with {:ok, _} <- Algora.Storage.upload(contents, path, upload_opts) do
|
||||
Membrane.Logger.info("Uploaded manifest #{path}")
|
||||
else
|
||||
err ->
|
||||
Membrane.Logger.error("Failed to upload #{path}: #{err}")
|
||||
end
|
||||
|
||||
timers = Map.delete(state.timers, name)
|
||||
{:noreply, %{ state | timers: timers }}
|
||||
end
|
||||
end
|
@ -1,17 +0,0 @@
|
||||
defmodule Algora.Pipeline.Storage.ManifestSupervisor do
|
||||
use DynamicSupervisor
|
||||
|
||||
def start_link(init_arg) do
|
||||
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
|
||||
end
|
||||
|
||||
def start_child(video) do
|
||||
spec = %{start: {Algora.Pipeline.Storage.Manifest, :start_link, [video]}}
|
||||
DynamicSupervisor.start_child(__MODULE__, spec)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(init_arg) do
|
||||
DynamicSupervisor.init(strategy: :one_for_one, extra_arguments: [init_arg])
|
||||
end
|
||||
end
|
@ -64,9 +64,9 @@ defmodule AlgoraWeb.HLSContentController do
|
||||
|
||||
result =
|
||||
if String.ends_with?(filename, "_delta.m3u8") do
|
||||
LLController.handle_delta_manifest_request(video_uuid, partial, filename)
|
||||
LLController.handle_delta_manifest_request(video_uuid, partial)
|
||||
else
|
||||
LLController.handle_manifest_request(video_uuid, partial, filename)
|
||||
LLController.handle_manifest_request(video_uuid, partial)
|
||||
end
|
||||
|
||||
case result do
|
||||
@ -101,10 +101,8 @@ defmodule AlgoraWeb.HLSContentController do
|
||||
{:error, :invalid_path} ->
|
||||
{:error, :bad_request, "Invalid filename, got #{filename}"}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Error handling request, reason: #{inspect(reason)}")
|
||||
{:error, _reason} ->
|
||||
{:error, :not_found, "File not found"}
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
4
mix.exs
4
mix.exs
@ -58,12 +58,10 @@ defmodule Algora.MixProject do
|
||||
{:membrane_core, "~> 1.0"},
|
||||
{:membrane_h264_ffmpeg_plugin, "~> 0.32.3"},
|
||||
{:membrane_http_adaptive_stream_plugin, "~> 0.18.5"},
|
||||
{:membrane_rtmp_plugin, github: "lastcanal/membrane_rtmp_plugin", branch: "data_frame_to_pipeline"}, # "~> 0.26.0"},
|
||||
{:membrane_rtmp_plugin, "~> 0.26.0"},
|
||||
{:membrane_tee_plugin, "~> 0.12.0"},
|
||||
{:membrane_file_plugin, "~> 0.17.2"},
|
||||
{:membrane_mp4_plugin, "~> 0.35.2"},
|
||||
{:membrane_framerate_converter_plugin, "~> 0.8.2"},
|
||||
{:membrane_ffmpeg_swscale_plugin, "~> 0.15.1"},
|
||||
{:mint, "~> 1.0"},
|
||||
{:oban, "~> 2.16"},
|
||||
{:open_api_spex, "~> 3.16"},
|
||||
|
4
mix.lock
4
mix.lock
@ -68,10 +68,8 @@
|
||||
"membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"},
|
||||
"membrane_core": {:hex, :membrane_core, "1.1.1", "4dcff6e9f3b2ecd4f437c20e201e53957731772c0f15b3005062c41f7f58f500", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3802f3fc071505c59d48792487d9927e803d4edb4039710ffa52cdb60bb0aecc"},
|
||||
"membrane_fake_plugin": {:hex, :membrane_fake_plugin, "0.11.0", "3a2d26f15ad4940a4d44cee3354dff38fa9a39963e9b2dcb49802e150ff9a9dc", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c6b6a06eaa4e820d1e4836510ddb4bcb386c8918d0b37542a21caf6b87cbe72"},
|
||||
"membrane_ffmpeg_swscale_plugin": {:hex, :membrane_ffmpeg_swscale_plugin, "0.15.1", "ba4f55ece752f590c8c912536c21f550498099133df7cfb297f35cb5a5b08dec", [:mix], [{:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}], "hexpm", "09cfaba3d602ce4557a03675038da882dd0e0599bf3d669e8dc04c34ed3e0b41"},
|
||||
"membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"},
|
||||
"membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.12.0", "d715ad405af86dcaf4b2f479e34088e1f6738c7280366828e1066b39d2aa493a", [:mix], [{:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}], "hexpm", "a317872d6d394e550c7bfd8979f12a3a1cc1e89b547d75360321025b403d3279"},
|
||||
"membrane_framerate_converter_plugin": {:hex, :membrane_framerate_converter_plugin, "0.8.2", "98c34d372e336729a753bd7eb3a08a19045af16699cbe2ddfee8b8543f52a789", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}], "hexpm", "d8177e8c1ccd5bed7665b74e0d107b81789053882e2a45d4ffa8bd020bfecaf3"},
|
||||
"membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.1", "9e108f4ef9d905ebff2da3ba5e58a5b756b58812f4fa68bd576add68fda310a0", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "39fdef1bf29eac949f65a37ea941f997c22ed042c55af044d27a781b63e82f6b"},
|
||||
"membrane_generator_plugin": {:hex, :membrane_generator_plugin, "0.10.0", "afa8037e3a7d05a31092233c082e37007be114cb370a426390727a1c8ad12036", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}], "hexpm", "cbb4882a7d7a3e1cf37e78a551e6f3d2d026ea8f841facad2ead2e78c4312d88"},
|
||||
"membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.32.3", "40e20024c4d2c8715a11327ac5390ebff048be2e4351022ee84093c669e42fe6", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "118f1461a6ef147e8333201edc5cb42a684d0b3b88347a338323dc3a9badde18"},
|
||||
@ -87,7 +85,7 @@
|
||||
"membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"},
|
||||
"membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"},
|
||||
"membrane_realtimer_plugin": {:hex, :membrane_realtimer_plugin, "0.9.0", "27210d5e32a5e8bfd101c41e4d8c1876e873a52cc129ebfbee4d0ccbea1cbd21", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b2e96d62135ee57ef9a5fdea94b3a9ab1198e5ea8ee248391b89c671125d1b51"},
|
||||
"membrane_rtmp_plugin": {:git, "https://github.com/lastcanal/membrane_rtmp_plugin.git", "0771191ecb42df73fde8766f4b2e23f2d73eb26b", [branch: "data_frame_to_pipeline"]},
|
||||
"membrane_rtmp_plugin": {:hex, :membrane_rtmp_plugin, "0.26.0", "b41ab950545a2e0d985193a9d75dfac56582e8fbdbbeb0f7bd52f13eab5228bc", [:mix], [{:membrane_aac_plugin, "~> 0.18.1", [hex: :membrane_aac_plugin, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_flv_plugin, "~> 0.12.0", [hex: :membrane_flv_plugin, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.0", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.2.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "dbae73841490b4c9fca03628e075a179f9eb92d361c0e2f02adb5102f234a46f"},
|
||||
"membrane_scissors_plugin": {:hex, :membrane_scissors_plugin, "0.8.0", "c8ee6d5b2d452d034d17a65a629bb13872dccb9fa9d232dc9bb85738d5723305", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:stream_split, "~> 0.1.3", [hex: :stream_split, repo: "hexpm", optional: false]}], "hexpm", "22487e6a4d45f4c85a50a93ff5c63e196175c04007365e3bb070d91b8e93ad02"},
|
||||
"membrane_tee_plugin": {:hex, :membrane_tee_plugin, "0.12.0", "f94989b4080ef4b7937d74c1a14d3379577c7bd4c6d06e5a2bb41c351ad604d4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "0d61c9ed5e68e5a75d54200e1c6df5739c0bcb52fee0974183ad72446a179887"},
|
||||
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
|
||||
|
@ -1,92 +0,0 @@
|
||||
defmodule Algora.Pipeline.HLS.EtsHelperTest do
|
||||
@moduledoc false
|
||||
|
||||
use ExUnit.Case, async: false
|
||||
|
||||
alias Algora.Pipeline.HLS.EtsHelper
|
||||
|
||||
@partial <<1, 2, 3>>
|
||||
@partial_name "muxed_segment_1_index.m4s"
|
||||
|
||||
@wrong_partial_name "muxed_segment_101_index.m4s"
|
||||
|
||||
@manifest "manifest"
|
||||
@manifest_name "index.m3u8"
|
||||
@delta_manifest "delta_manifest"
|
||||
@delta_manifest_name "index_delta.m3u8"
|
||||
|
||||
@recent_partial {1, 1}
|
||||
@delta_recent_partial {2, 2}
|
||||
|
||||
@videos_to_tables :videos_to_tables
|
||||
|
||||
setup do
|
||||
video_uuid = UUID.uuid4()
|
||||
|
||||
# Ets tables are not removed during tests because they are automatically removed when the owner process dies.
|
||||
# Therefore, using on_exit (as a separate process) would cause a crash.
|
||||
{:ok, table} = EtsHelper.add_video(video_uuid)
|
||||
|
||||
%{video_uuid: video_uuid, table: table}
|
||||
end
|
||||
|
||||
test "videos managment" do
|
||||
video_uuid = UUID.uuid4()
|
||||
assert {:error, :video_not_found} == EtsHelper.get_partial(video_uuid, @partial_name)
|
||||
|
||||
{:ok, table} = EtsHelper.add_video(video_uuid)
|
||||
assert {:error, :already_exists} == EtsHelper.add_video(video_uuid)
|
||||
|
||||
assert [{video_uuid, table}] == :ets.lookup(@videos_to_tables, video_uuid)
|
||||
|
||||
:ok = EtsHelper.remove_video(video_uuid)
|
||||
assert {:error, "Video: #{video_uuid} doesn't exist"} == EtsHelper.remove_video(video_uuid)
|
||||
|
||||
assert [] == :ets.lookup(@videos_to_tables, video_uuid)
|
||||
end
|
||||
|
||||
test "partials managment", %{video_uuid: video_uuid, table: table} do
|
||||
assert {:error, :file_not_found} == EtsHelper.get_partial(video_uuid, @partial_name)
|
||||
|
||||
EtsHelper.add_partial(table, @partial, @partial_name)
|
||||
|
||||
assert {:ok, @partial} == EtsHelper.get_partial(video_uuid, @partial_name)
|
||||
|
||||
assert {:error, :file_not_found} ==
|
||||
EtsHelper.get_partial(video_uuid, @wrong_partial_name)
|
||||
|
||||
EtsHelper.delete_partial(table, @partial_name)
|
||||
|
||||
assert {:error, :file_not_found} == EtsHelper.get_partial(video_uuid, @partial_name)
|
||||
end
|
||||
|
||||
test "manifests managment", %{video_uuid: video_uuid, table: table} do
|
||||
assert {:error, :file_not_found} == EtsHelper.get_manifest(video_uuid, @manifest_name)
|
||||
assert {:error, :file_not_found} == EtsHelper.get_delta_manifest(video_uuid, @delta_manifest_name)
|
||||
|
||||
EtsHelper.update_manifest(table, @manifest, @manifest_name)
|
||||
|
||||
assert {:ok, @manifest} == EtsHelper.get_manifest(video_uuid, @manifest_name)
|
||||
assert {:error, :file_not_found} == EtsHelper.get_delta_manifest(video_uuid, @delta_manifest_name)
|
||||
|
||||
EtsHelper.update_delta_manifest(table, @delta_manifest, @delta_manifest_name)
|
||||
|
||||
assert {:ok, @manifest} == EtsHelper.get_manifest(video_uuid,@manifest_name)
|
||||
assert {:ok, @delta_manifest} == EtsHelper.get_delta_manifest(video_uuid, @delta_manifest_name)
|
||||
end
|
||||
|
||||
test "recent partial managment", %{video_uuid: video_uuid, table: table} do
|
||||
assert {:error, :file_not_found} == EtsHelper.get_recent_partial(video_uuid, @manifest_name)
|
||||
assert {:error, :file_not_found} == EtsHelper.get_delta_recent_partial(video_uuid, @delta_manifest_name)
|
||||
|
||||
EtsHelper.update_recent_partial(table, @recent_partial, @manifest_name)
|
||||
|
||||
assert {:ok, @recent_partial} == EtsHelper.get_recent_partial(video_uuid, @manifest_name)
|
||||
assert {:error, :file_not_found} == EtsHelper.get_delta_recent_partial(video_uuid, @delta_manifest_name)
|
||||
|
||||
EtsHelper.update_delta_recent_partial(table, @delta_recent_partial, @delta_manifest_name)
|
||||
|
||||
assert {:ok, @recent_partial} == EtsHelper.get_recent_partial(video_uuid, @manifest_name)
|
||||
assert {:ok, @delta_recent_partial} == EtsHelper.get_delta_recent_partial(video_uuid, @delta_manifest_name)
|
||||
end
|
||||
end
|
@ -1,152 +0,0 @@
|
||||
defmodule Algora.Pipeline.HLS.LLControllerTest do
|
||||
@moduledoc false
|
||||
|
||||
use ExUnit.Case, async: true
|
||||
|
||||
alias Algora.Pipeline.HLS.{LLController, EtsHelper}
|
||||
|
||||
@manifest "index.m3u8"
|
||||
@delta_manifest "manifest_delta_index.m3u8"
|
||||
@manifest_content "manifest"
|
||||
|
||||
@partial {1, 1}
|
||||
@partial_name "muxed_segment_1_index_1_part.m4s"
|
||||
@partial_content <<1, 2, 3>>
|
||||
|
||||
@next_partial {1, 2}
|
||||
@next_partial_name "muxed_segment_1_index_2_part.m4s"
|
||||
@next_partial_content <<1, 2, 3, 4>>
|
||||
|
||||
@future_partial_name "muxed_segment_1_index_4_part.m4s"
|
||||
|
||||
@videos_to_tables :videos_to_tables
|
||||
|
||||
setup %{tmp_dir: tmp_dir} do
|
||||
video_uuid = UUID.uuid4()
|
||||
directory = Path.join(tmp_dir, video_uuid)
|
||||
|
||||
# LLController is not removed at all in tests
|
||||
# It removes itself when parent process is killed
|
||||
LLController.start(video_uuid, directory)
|
||||
|
||||
%{video_uuid: video_uuid, directory: directory}
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "video managment", %{video_uuid: video_uuid, directory: directory} do
|
||||
{:error, {:already_started, _pid}} = LLController.start(video_uuid, directory)
|
||||
{:error, :already_exists} = EtsHelper.add_video(video_uuid)
|
||||
|
||||
LLController.stop(video_uuid)
|
||||
|
||||
# wait for ets to be removed
|
||||
Process.sleep(200)
|
||||
|
||||
assert {:error, :video_not_found} == EtsHelper.get_manifest(video_uuid, @manifest)
|
||||
|
||||
assert {:ok, _pid} = LLController.start(video_uuid, directory)
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "manifest request", %{video_uuid: video_uuid} do
|
||||
assert {:error, :file_not_found} == LLController.handle_manifest_request(video_uuid, @partial, @manifest)
|
||||
|
||||
{:ok, table} = get_table_for_video(video_uuid)
|
||||
|
||||
assert {:error, :file_not_found} == LLController.handle_manifest_request(video_uuid, @partial, @manifest)
|
||||
|
||||
EtsHelper.update_recent_partial(table, @partial, @manifest)
|
||||
EtsHelper.update_manifest(table, @manifest_content, @manifest)
|
||||
LLController.update_recent_partial(video_uuid, @partial, :manifest, @manifest)
|
||||
|
||||
assert {:ok, @manifest_content} == LLController.handle_manifest_request(video_uuid, @partial, @manifest)
|
||||
|
||||
task =
|
||||
Task.async(fn ->
|
||||
LLController.handle_manifest_request(video_uuid, @next_partial, @manifest)
|
||||
end)
|
||||
|
||||
assert nil == Task.yield(task, 500)
|
||||
|
||||
LLController.update_recent_partial(video_uuid, @next_partial, :manifest, @manifest)
|
||||
|
||||
assert {:ok, @manifest_content} == Task.await(task)
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "delta manifest request", %{video_uuid: video_uuid} do
|
||||
assert {:error, :file_not_found} ==
|
||||
LLController.handle_delta_manifest_request(video_uuid, @partial, @delta_manifest)
|
||||
|
||||
{:ok, table} = get_table_for_video(video_uuid)
|
||||
|
||||
assert {:error, :file_not_found} ==
|
||||
LLController.handle_delta_manifest_request(video_uuid, @partial, @delta_manifest)
|
||||
|
||||
EtsHelper.update_delta_recent_partial(table, @partial, @delta_manifest)
|
||||
EtsHelper.update_delta_manifest(table, @manifest_content, @delta_manifest)
|
||||
LLController.update_recent_partial(video_uuid, @partial, :delta_manifest, @delta_manifest)
|
||||
|
||||
assert {:ok, @manifest_content} ==
|
||||
LLController.handle_delta_manifest_request(video_uuid, @partial, @delta_manifest)
|
||||
|
||||
task =
|
||||
Task.async(fn ->
|
||||
LLController.handle_delta_manifest_request(video_uuid, @next_partial, @delta_manifest)
|
||||
end)
|
||||
|
||||
assert nil == Task.yield(task, 500)
|
||||
|
||||
LLController.update_recent_partial(video_uuid, @next_partial, :delta_manifest, @delta_manifest)
|
||||
|
||||
assert {:ok, @manifest_content} == Task.await(task)
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "partial request", %{video_uuid: video_uuid} do
|
||||
assert {:error, :file_not_found} ==
|
||||
LLController.handle_partial_request(video_uuid, @partial_name)
|
||||
|
||||
{:ok, table} = get_table_for_video(video_uuid)
|
||||
|
||||
assert {:error, :file_not_found} ==
|
||||
LLController.handle_partial_request(video_uuid, @partial_name)
|
||||
|
||||
EtsHelper.add_partial(table, @partial_content, @partial_name)
|
||||
|
||||
assert {:ok, @partial_content} ==
|
||||
LLController.handle_partial_request(video_uuid, @partial_name)
|
||||
|
||||
assert {:error, :file_not_found} ==
|
||||
LLController.handle_partial_request(video_uuid, "wrong_partial_name")
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "preload hint request", %{video_uuid: video_uuid} do
|
||||
{:ok, table} = get_table_for_video(video_uuid)
|
||||
|
||||
EtsHelper.add_partial(table, @partial_content, @partial_name)
|
||||
EtsHelper.update_recent_partial(table, @partial, @manifest)
|
||||
LLController.update_recent_partial(video_uuid, @partial, :manifest, @manifest)
|
||||
|
||||
task =
|
||||
Task.async(fn -> LLController.handle_partial_request(video_uuid, @next_partial_name) end)
|
||||
|
||||
assert nil == Task.yield(task, 500)
|
||||
|
||||
EtsHelper.add_partial(table, @next_partial_content, @next_partial_name)
|
||||
EtsHelper.update_recent_partial(table, @next_partial, @manifest)
|
||||
LLController.update_recent_partial(video_uuid, @next_partial, :manifest, @manifest)
|
||||
|
||||
assert {:ok, @next_partial_content} == Task.await(task)
|
||||
assert {:error, :file_not_found} ==
|
||||
LLController.handle_partial_request(video_uuid, @future_partial_name)
|
||||
end
|
||||
|
||||
defp get_table_for_video(video_uuid) do
|
||||
case :ets.lookup(@videos_to_tables, video_uuid) do
|
||||
[{^video_uuid, table}] -> {:ok, table}
|
||||
_ -> {:error, :not_found}
|
||||
end
|
||||
end
|
||||
end
|
@ -1,138 +0,0 @@
|
||||
defmodule Algora.Pipeline.StorageTest do
|
||||
@moduledoc false
|
||||
|
||||
use ExUnit.Case, async: true
|
||||
|
||||
alias Algora.Pipeline.HLS.{EtsHelper, LLController}
|
||||
alias Algora.Pipeline.Storage
|
||||
alias Algora.Library.Video
|
||||
|
||||
@segment_name "muxed_segment_0_manifest.m4s"
|
||||
@segment_content <<1, 2, 3>>
|
||||
|
||||
@partial_name "muxed_segment_0_manifest_0_part.m4s"
|
||||
@partial_content <<1, 2, 3, 4>>
|
||||
@partial_sn {0, 0}
|
||||
|
||||
@manifest_name "manifest.m3u8"
|
||||
@manifest_content "manifest_content"
|
||||
|
||||
@delta_manifest_name "manifest_delta.m3u8"
|
||||
@delta_manifest_content "delta_manifest_content"
|
||||
|
||||
@header_name "header"
|
||||
@header_content <<1, 2, 3, 4, 5>>
|
||||
|
||||
setup %{tmp_dir: tmp_dir} do
|
||||
video_uuid = UUID.uuid4()
|
||||
directory = Path.join(tmp_dir, video_uuid)
|
||||
|
||||
File.mkdir_p!(directory)
|
||||
|
||||
config = %Storage{directory: directory, video: %Video{ uuid: video_uuid }}
|
||||
|
||||
storage = Storage.init(config)
|
||||
{:ok, _pid} = LLController.start(video_uuid, directory)
|
||||
|
||||
%{storage: storage, directory: directory, video_uuid: video_uuid}
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "store partial", %{storage: storage, video_uuid: video_uuid} do
|
||||
{:ok, _storage} = store_partial(storage)
|
||||
|
||||
:timer.sleep(200)
|
||||
|
||||
assert {:ok, @partial_content} == EtsHelper.get_partial(video_uuid, @partial_name)
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "store manifest", %{storage: storage, video_uuid: video_uuid} do
|
||||
{:ok, storage} = store_partial(storage)
|
||||
{:ok, _storage} = store_manifest(storage)
|
||||
|
||||
:timer.sleep(200)
|
||||
|
||||
assert {:ok, @manifest_content} == EtsHelper.get_manifest(video_uuid, @manifest_name)
|
||||
assert {:ok, @partial_sn} == EtsHelper.get_recent_partial(video_uuid, @manifest_name)
|
||||
|
||||
pid = self()
|
||||
|
||||
spawn(fn ->
|
||||
{:ok, @manifest_content} = LLController.handle_manifest_request(video_uuid, @partial_sn, @manifest_name)
|
||||
send(pid, :manifest)
|
||||
end)
|
||||
|
||||
assert_receive(:manifest)
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
@tag timeout: 1_000
|
||||
test "store delta manifest", %{storage: storage, video_uuid: video_uuid} do
|
||||
{:ok, storage} = store_partial(storage)
|
||||
{:ok, _storage} = store_delta_manifest(storage)
|
||||
|
||||
:timer.sleep(200)
|
||||
|
||||
assert {:ok, @delta_manifest_content} == EtsHelper.get_delta_manifest(video_uuid, @delta_manifest_name)
|
||||
assert {:ok, @partial_sn} == EtsHelper.get_delta_recent_partial(video_uuid, @delta_manifest_name)
|
||||
|
||||
assert {:ok, @delta_manifest_content} ==
|
||||
LLController.handle_delta_manifest_request(video_uuid, @partial_sn, @delta_manifest_name)
|
||||
end
|
||||
|
||||
defp store_segment(storage) do
|
||||
Storage.store(
|
||||
:parent_id,
|
||||
@segment_name,
|
||||
@segment_content,
|
||||
:metadata,
|
||||
%{mode: :binary, type: :segment},
|
||||
storage
|
||||
)
|
||||
end
|
||||
|
||||
defp store_partial(storage) do
|
||||
Storage.store(
|
||||
:parent_id,
|
||||
@segment_name,
|
||||
@partial_content,
|
||||
%{partial_name: @partial_name, sequence_number: 0},
|
||||
%{mode: :binary, type: :partial_segment},
|
||||
storage
|
||||
)
|
||||
end
|
||||
|
||||
defp store_manifest(storage) do
|
||||
Storage.store(
|
||||
:parent_id,
|
||||
@manifest_name,
|
||||
@manifest_content,
|
||||
:metadata,
|
||||
%{mode: :text, type: :manifest},
|
||||
storage
|
||||
)
|
||||
end
|
||||
|
||||
defp store_delta_manifest(storage) do
|
||||
Storage.store(
|
||||
:parent_id,
|
||||
@delta_manifest_name,
|
||||
@delta_manifest_content,
|
||||
:metadata,
|
||||
%{mode: :text, type: :manifest},
|
||||
storage
|
||||
)
|
||||
end
|
||||
|
||||
defp store_header(storage) do
|
||||
Storage.store(
|
||||
:parent_id,
|
||||
@header_name,
|
||||
@header_content,
|
||||
:metadata,
|
||||
%{mode: :binary, type: :header},
|
||||
storage
|
||||
)
|
||||
end
|
||||
end
|
@ -1,4 +1,2 @@
|
||||
|
||||
ExUnit.start(capture_log: true)
|
||||
|
||||
ExUnit.start()
|
||||
Ecto.Adapters.SQL.Sandbox.mode(Algora.Repo, :manual)
|
||||
|
Loading…
x
Reference in New Issue
Block a user