1
0
mirror of https://github.com/algora-io/tv.git synced 2025-10-30 23:07:56 +02:00

rewrite from eager puts to lazy gets

This commit is contained in:
zafer
2024-10-25 00:55:12 +03:00
parent 67059c630d
commit fbeeb48b65
5 changed files with 41 additions and 31 deletions

View File

@@ -50,7 +50,7 @@ defmodule Algora.Application do
AlgoraWeb.Endpoint,
AlgoraWeb.Embed.Endpoint,
# Start the LL-HLS controller registry
{Registry, keys: :unique, name: Algora.LLControllerRegistry},
# {Registry, keys: :unique, name: Algora.LLControllerRegistry},
# Start the RTMP server
%{
id: Membrane.RTMP.Source.TcpServer,

View File

@@ -148,7 +148,8 @@ defmodule Algora.Pipeline do
dir = Path.join(Admin.tmp_dir(), video.uuid)
:rpc.multicall(LLController, :start, [video.uuid, dir])
LLController.start(video.uuid, dir)
# :rpc.multicall(LLController, :start, [video.uuid, dir])
{:ok, video} =
Algora.Library.reconcile_livestream(

View File

@@ -6,7 +6,6 @@ defmodule Algora.Pipeline.HLS.LLController do
alias Algora.Pipeline.HLS.EtsHelper
alias Algora.Library.Video
alias Algora.Admin
@enforce_keys [:video_uuid, :directory, :video_pid]
defstruct @enforce_keys ++
@@ -262,8 +261,8 @@ defmodule Algora.Pipeline.HLS.LLController do
end
@impl true
def handle_cast({:apply, [module, function, args]}, state) do
apply(module, function, args)
def handle_cast({:cast, [function, args]}, state) do
apply(__MODULE__, function, args)
{:noreply, state}
end
@@ -272,6 +271,12 @@ defmodule Algora.Pipeline.HLS.LLController do
{:stop, :normal, state}
end
@impl true
def handle_call({:call, [function, args]}, _from, state) do
result = apply(__MODULE__, function, args)
{:reply, result, state}
end
@impl true
def terminate(_reason, %{video_uuid: video_uuid}) do
EtsHelper.remove_video(video_uuid)
@@ -371,7 +376,8 @@ defmodule Algora.Pipeline.HLS.LLController do
|> List.to_tuple()
end
def registry_id(video_uuid), do: {:via, Registry, {Algora.LLControllerRegistry, video_uuid}}
def registry_id(video_uuid), do: {:global, {:ll_controller, video_uuid}}
# def registry_id(video_uuid), do: {:via, Registry, {Algora.LLControllerRegistry, video_uuid}}
defp send_partial_ready(waiting_pids) do
Enum.each(waiting_pids, fn pid -> send(pid, :manifest_ready) end)
@@ -398,13 +404,11 @@ defmodule Algora.Pipeline.HLS.LLController do
relative_path != path and relative_path != "."
end
def broadcast!(video_uuid, [_module, _function, _args] = msg) do
for node <- Admin.nodes() do
:rpc.cast(node, Algora.Pipeline.HLS.LLController, :apply, [video_uuid, msg])
end
def cast(video_uuid, [function, args]) do
GenServer.cast(registry_id(video_uuid), {:cast, [function, args]})
end
def apply(video_uuid, msg) do
GenServer.cast(registry_id(video_uuid), {:apply, msg})
def call(video_uuid, [function, args]) do
GenServer.call(registry_id(video_uuid), {:call, [function, args]})
end
end

View File

@@ -92,7 +92,7 @@ defmodule Algora.Pipeline.Storage do
content,
%__MODULE__{video: video} = state
) do
broadcast!(video.uuid, [LLController, :write_to_file, [video.uuid, filename, content]])
LLController.cast(video.uuid, [:write_to_file, [video.uuid, filename, content]])
unless filename == "index.m3u8" do
add_manifest_to_ets(filename, content, state)
@@ -103,14 +103,13 @@ defmodule Algora.Pipeline.Storage do
end
defp add_manifest_to_ets(filename, manifest, %{video: video}) do
broadcast!(video.uuid, [
LLController,
manifest_type =
if(String.ends_with?(filename, @delta_manifest_suffix),
do: :update_delta_manifest,
else: :update_manifest
),
[video.uuid, manifest]
])
)
LLController.cast(video.uuid, [manifest_type, [video.uuid, manifest]])
end
defp add_partial_to_ets(
@@ -123,7 +122,7 @@ defmodule Algora.Pipeline.Storage do
partial_name,
content
) do
broadcast!(video.uuid, [LLController, :add_partial, [video.uuid, content, partial_name]])
LLController.cast(video.uuid, [:add_partial, [video.uuid, content, partial_name]])
partial = {segment_sn, partial_sn}
%{state | partials_in_ets: [{partial, partial_name} | partials_in_ets]}
@@ -142,14 +141,12 @@ defmodule Algora.Pipeline.Storage do
end)
Enum.each(partial_to_be_removed, fn {_sn, partial_name} ->
broadcast!(video.uuid, [LLController, :delete_partial, [video.uuid, partial_name]])
LLController.cast(video.uuid, [:delete_partial, [video.uuid, partial_name]])
end)
%{state | partials_in_ets: partials_in_ets}
end
defp broadcast!(video_uuid, msg), do: LLController.broadcast!(video_uuid, msg)
defp send_update(filename, %{
video: video,
segment_sn: segment_sn,
@@ -163,7 +160,7 @@ defmodule Algora.Pipeline.Storage do
partial = {segment_sn, partial_sn}
broadcast!(video.uuid, [LLController, :update_recent_partial, [video.uuid, partial, manifest]])
LLController.cast(video.uuid, [:update_recent_partial, [video.uuid, partial, manifest]])
end
defp update_sequence_numbers(
@@ -248,18 +245,26 @@ defmodule Algora.Pipeline.Storage do
contents,
%{independent?: true},
%{type: :partial_segment, mode: :binary},
%{setup_completed?: false, video: video, video_header: video_header, segment_sn: segment_sn} = state
%{
setup_completed?: false,
video: video,
video_header: video_header,
segment_sn: segment_sn
} = state
) do
marker = Thumbnails.find_marker(segment_sn)
if (marker) do
if marker do
Task.Supervisor.start_child(Algora.TaskSupervisor, fn ->
Thumbnails.store_thumbnail(video, video_header, contents, marker)
end)
end
%{state | setup_completed?: if(marker, do: Thumbnails.is_last_marker?(marker), else: false), video_segment: contents}
%{
state
| setup_completed?: if(marker, do: Thumbnails.is_last_marker?(marker), else: false),
video_segment: contents
}
end
defp process_contents(

View File

@@ -64,9 +64,9 @@ defmodule AlgoraWeb.HLSContentController do
result =
if String.ends_with?(filename, "_delta.m3u8") do
LLController.handle_delta_manifest_request(video_uuid, partial)
LLController.call(video_uuid, [:handle_delta_manifest_request, [video_uuid, partial]])
else
LLController.handle_manifest_request(video_uuid, partial)
LLController.call(video_uuid, [:handle_manifest_request, [video_uuid, partial]])
end
case result do
@@ -84,9 +84,9 @@ defmodule AlgoraWeb.HLSContentController do
def index(conn, %{"video_uuid" => video_uuid, "filename" => filename}) do
result =
if String.ends_with?(filename, "_part.m4s") do
LLController.handle_partial_request(video_uuid, filename)
LLController.call(video_uuid, [:handle_partial_request, [video_uuid, filename]])
else
LLController.handle_file_request(video_uuid, filename)
LLController.call(video_uuid, [:handle_file_request, [video_uuid, filename]])
end
case result do