1
0
mirror of https://github.com/algora-io/tv.git synced 2025-10-30 23:07:56 +02:00

remove and reconnect forwarded rtmp connections

This commit is contained in:
ty
2024-10-28 22:23:26 -04:00
parent 93069e09e1
commit 304c8c3b49

View File

@@ -27,6 +27,7 @@ defmodule Algora.Pipeline do
data_frame: nil,
playing: false,
finalized: false,
forwarding: [],
]
def segment_duration(), do: @segment_duration_seconds
@@ -95,7 +96,8 @@ defmodule Algora.Pipeline do
reconnect: reconnect
}
{:ok, state} = setup_extras!(state)
setup_forwarding!(state)
setup_extras!(state)
spec = [
#
@@ -200,6 +202,7 @@ defmodule Algora.Pipeline do
]
send(self(), :link_tracks)
setup_forwarding!(state)
{
[
@@ -255,6 +258,7 @@ defmodule Algora.Pipeline do
remove_link: {:funnel_video, Pad.ref(:input, reconnect)},
remove_link: {:funnel_audio, Pad.ref(:input, reconnect)},
remove_link: {:sink, Pad.ref(:input, "audio_master")},
remove_children: state.forwarding,
] ++ if transcode_formats = transcode_formats(state.data_frame) do
Enum.flat_map(transcode_formats, fn(%{track_name: track_name}) ->
[
@@ -270,26 +274,30 @@ defmodule Algora.Pipeline do
]
end |> Enum.filter(& &1)
{actions, state}
{actions, %{state | forwarding: []}}
end
def handle_info({:forward_rtmp, url, ref}, _ctx, state) do
spec = [
#
child(ref, %Membrane.RTMP.Sink{rtmp_url: url}),
if Enum.member?(state.forwarding, ref) do
{[], state}
else
spec = [
#
child(ref, %Membrane.RTMP.Sink{rtmp_url: url}),
#
get_child(:tee_audio)
|> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000)
|> get_child(ref),
#
get_child(:tee_audio)
|> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000)
|> get_child(ref),
#
get_child(:tee_video)
|> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000)
|> get_child(ref)
]
#
get_child(:tee_video)
|> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000)
|> get_child(ref)
]
{[spec: spec], state}
{[spec: spec], %{state | forwarding: [ref | state.forwarding]}}
end
end
def handle_info(:multicast_algora, _ctx, state) do
@@ -441,18 +449,20 @@ defmodule Algora.Pipeline do
])
end
defp setup_extras!(%{ video: video, user: user } = state) do
defp setup_forwarding!(%{video: video} = state) do
destinations = Algora.Accounts.list_active_destinations(video.user_id)
for {destination, i} <- Enum.with_index(destinations) do
for destination <- destinations do
url =
URI.new!(destination.rtmp_url)
|> URI.append_path("/" <> destination.stream_key)
|> URI.to_string()
send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{i}")})
send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{destination.id}")})
end
end
defp setup_extras!(%{video: video, user: user} = state) do
if url = Algora.Accounts.get_restream_ws_url(user) do
Task.Supervisor.start_child(
Algora.TaskSupervisor,
@@ -474,8 +484,6 @@ defmodule Algora.Pipeline do
{Algora.Youtube.Chat.Fetcher, %{video: video, youtube_handle: youtube_handle}}
)
end
{:ok, state}
end
defp terminate_later(state), do: terminate_later(state, @terminate_after)