1
0
mirror of https://github.com/algora-io/tv.git synced 2025-10-30 23:07:56 +02:00

Serve low-latency HLS partials from S3

Upload partial segments in parallel

Wait for uploads to finish before sending preload hint by using
selective receive and waiting for the uploading process to exit

Redirect requests for partials to S3 after waiting for the parital to be uploaded

Remove partials from s3 when they are removed from ets

Set partial segment duration to 1 second
This commit is contained in:
ty
2024-11-11 16:45:47 -05:00
parent 5d6728c0f1
commit cdc836c8e2
7 changed files with 110 additions and 32 deletions

View File

@@ -10,7 +10,7 @@ defmodule Algora.Pipeline do
@segment_duration_seconds 6
@segment_duration Time.seconds(@segment_duration_seconds)
@partial_segment_duration_milliseconds 200
@partial_segment_duration_milliseconds 1000
@partial_segment_duration Time.milliseconds(@partial_segment_duration_milliseconds)
@app "live"
@terminate_after String.to_integer(Algora.config([:resume_rtmp_timeout])) * 1000

View File

@@ -154,9 +154,15 @@ defmodule Algora.Pipeline.HLS.EtsHelper do
end
defp lookup_helper(table, key, error) do
case :ets.lookup(table, key) do
[{^key, val}] -> {:ok, val}
[] -> {:error, error}
try do
case :ets.lookup(table, key) do
[{^key, val}] -> {:ok, val}
[] -> {:error, error}
end
rescue
error in ArgumentError ->
# table not found
{:error, error}
end
end

View File

@@ -69,18 +69,17 @@ defmodule Algora.Pipeline.HLS.LLController do
@spec handle_partial_request(Video.uuid(), String.t()) ::
{:ok, binary()} | {:error, atom()}
def handle_partial_request(video_uuid, filename) do
with {:ok, partial} <- EtsHelper.get_partial(video_uuid, filename) do
{:ok, partial}
with {:ok, :ready} <- EtsHelper.get_partial(video_uuid, filename) do
{:redirect, filename}
else
{:error, :file_not_found} ->
case preload_hint?(video_uuid, filename) do
{:ok, true} ->
wait_for_partial_ready(video_uuid, filename)
with {:ok, true} <- preload_hint?(video_uuid, filename),
{:ok, :ready} <- wait_for_partial_ready(video_uuid, filename) do
{:redirect, filename}
else
_other ->
{:error, :file_not_found}
end
error ->
error
end

View File

@@ -344,6 +344,11 @@ defmodule Algora.Pipeline.Sink do
end
end
@impl true
def handle_info({:DOWN, _rev, :process, _pid, _reason}, _ctx, state) do
{[], state}
end
defp serialize_track_name(track_id) when is_binary(track_id) do
valid_filename_regex = ~r/^[^\/:*?"<>|]+$/

View File

@@ -16,7 +16,8 @@ defmodule Algora.Pipeline.Storage do
video_header: <<>>,
video_segment: <<>>,
setup_completed?: false,
manifest_uploader: nil
manifest_uploader: nil,
partial_uploaders: %{}
]
@type partial_ets_key :: String.t()
@@ -33,11 +34,13 @@ defmodule Algora.Pipeline.Storage do
video_header: <<>>,
video_segment: <<>>,
setup_completed?: boolean(),
manifest_uploader: pid()
manifest_uploader: pid(),
partial_uploaders: %{ manifest_name() => pid() }
}
@ets_cached_duration_in_segments 4
@delta_manifest_suffix "_delta.m3u8"
@partial_uploader_send_update_timeout 1000
@impl true
def init(state) do
@@ -86,7 +89,7 @@ defmodule Algora.Pipeline.Storage do
state =
process_contents(parent_id, name, contents, metadata, ctx, state)
|> update_sequence_numbers(sequence_number, manifest_name)
|> add_partial_to_ets(name, partial_name, contents, manifest_name)
|> add_partial(name, ctx, partial_name, contents, manifest_name)
{:ok, state}
end
@@ -101,12 +104,12 @@ defmodule Algora.Pipeline.Storage do
:ok = GenServer.cast(uploader, {:upload, filename, content, upload_opts(context)})
unless filename == "index.m3u8" do
if filename == "index.m3u8" do
{:ok, state}
else
add_manifest_to_ets(filename, content, state)
send_update(filename, state)
end
{:ok, state}
end
defp cache_header(
@@ -131,35 +134,34 @@ defmodule Algora.Pipeline.Storage do
])
end
defp add_partial_to_ets(
defp add_partial(
%{
partials_in_ets: partials_in_ets,
sequences: sequences,
video: video
partial_uploaders: partial_uploaders
} = state,
ctx,
segment_name,
partial_name,
content,
manifest_name
) do
broadcast!(video.uuid, [LLController, :add_partial, [
video.uuid, content, segment_name, partial_name
]])
{:ok, pid} = upload_partial(state, ctx, segment_name, partial_name, content, manifest_name)
if partial = sequences[manifest_name] do
partials = Map.get(partials_in_ets, manifest_name, [])
partials_in_ets = Map.put(partials_in_ets, manifest_name, [{partial, partial_name} | partials])
%{state | partials_in_ets: partials_in_ets}
partial_uploaders = Map.put(partial_uploaders, manifest_name, pid)
%{state | partials_in_ets: partials_in_ets, partial_uploaders: partial_uploaders}
else
state
end
end
defp remove_partials_from_ets(
defp remove_partials(
%{
partials_in_ets: partials_in_ets,
sequences: sequences,
video: video,
sequences: sequences
} = state,
manifest_name
) do
@@ -169,8 +171,9 @@ defmodule Algora.Pipeline.Storage do
segment_sn + (@ets_cached_duration_in_segments) > curr_segment_sn
end)
Enum.each(partial_to_be_removed, fn {_sn, partial_name} ->
broadcast!(video.uuid, [LLController, :delete_partial, [video.uuid, partial_name]])
Enum.reduce(partial_to_be_removed, state, fn {_sn, partial_name}, state ->
{:ok, _pid} = remove_uploaded_partial(state, partial_name)
state
end)
partials_in_ets = Map.put(partials_in_ets, manifest_name, partials)
@@ -180,24 +183,75 @@ defmodule Algora.Pipeline.Storage do
end
end
defp upload_partial(state, ctx, segment_name, partial_name, content, _manifest_name) do
Task.Supervisor.start_child(Algora.TaskSupervisor, fn ->
path = "#{state.video.uuid}/#{partial_name}"
with {:ok, _} <- Algora.Storage.upload(content, path, upload_opts(ctx)) do
broadcast!(state.video.uuid, [LLController, :add_partial, [
state.video.uuid, :ready, segment_name, partial_name
]])
else
err ->
Membrane.Logger.error("Failed to upload partial #{path} #{inspect(err)}")
end
end)
end
defp remove_uploaded_partial(state, partial_name) do
path = "#{state.video.uuid}/#{partial_name}"
Task.Supervisor.start_child(Algora.TaskSupervisor, fn ->
with {:ok, _resp} <- Algora.Storage.remove(path) do
broadcast!(state.video.uuid, [LLController, :delete_partial, [state.video.uuid, partial_name]])
else
err ->
Membrane.Logger.error("Failed to remove uploaded partial #{path}: #{inspect(err)}")
end
end)
end
defp broadcast!(video_uuid, msg), do: LLController.broadcast!(video_uuid, msg)
defp send_update(filename, %{
video: video,
sequences: sequences
}) do
sequences: sequences,
partial_uploaders: partial_uploaders
} = state) do
manifest =
if(String.ends_with?(filename, @delta_manifest_suffix),
do: :delta_manifest,
else: :manifest
)
partial_uploader = partial_uploaders[filename]
state = with true <- is_pid(partial_uploader),
true <- Process.alive?(partial_uploader),
ref <- Process.monitor(partial_uploader) do
partial_uploaders = receive do
{:DOWN, ^ref, :process, ^partial_uploader, :normal} ->
Map.delete(partial_uploaders, partial_uploader)
{:DOWN, ^ref, :process, ^partial_uploader, reason} ->
Membrane.Logger.error(
"Partial uploader for #{video.uuid}/#{filename} exited with with reason #{inspect(reason)}"
)
Map.delete(partial_uploaders, partial_uploader)
after
@partial_uploader_send_update_timeout ->
partial_uploaders
end
%{state | partial_uploaders: Map.put(partial_uploaders, filename, nil)}
else
_other -> state
end
manifest_name = String.replace(filename, @delta_manifest_suffix, ".m3u8")
if partial = sequences[manifest_name] do
broadcast!(video.uuid, [LLController, :update_recent_partial, [
video.uuid, partial, manifest, filename
]])
end
{:ok, state}
end
defp update_sequence_numbers(
@@ -217,7 +271,7 @@ defmodule Algora.Pipeline.Storage do
|> then(&Map.put(state, :sequences, &1))
# If there is a new segment we want to remove partials that are too old from ets
if new_segment? do
remove_partials_from_ets(state, manifest_name)
remove_partials(state, manifest_name)
else
state
end

View File

@@ -69,4 +69,14 @@ defmodule Algora.Storage do
err -> err
end
end
def remove(remote_path, opts \\ []) do
remove_from_bucket(remote_path, :media, opts)
end
def remove_from_bucket(remote_path, bucket, opts) do
ExAws.S3.delete_object(Algora.config([:buckets, bucket]), remote_path, opts)
|> ExAws.request([])
end
end

View File

@@ -98,6 +98,10 @@ defmodule AlgoraWeb.HLSContentController do
Conn.send_resp(conn, 200, file)
{:redirect, partial_name} ->
redirect(conn,
external: Algora.Storage.to_absolute(:video, video_uuid, partial_name))
{:error, :invalid_path} ->
{:error, :bad_request, "Invalid filename, got #{filename}"}