1
0
mirror of https://github.com/algora-io/tv.git synced 2025-02-04 01:53:25 +02:00

Validate stream_key inside Pipeline (#98)

Allows using /live + <stream_key> or /<stream-key> or /<stream-key> +
<stream_key>, but no other combination

Moves all post-connection logic out of the Validator and into the
Pipeline

Delays calling init_livestream! until the stream is validated

Removes video_id from Validator config

Delays attaching HLS output until the stream is validated

User interface for stream key
This commit is contained in:
ty 2024-09-19 08:35:22 -04:00 committed by GitHub
parent c3e6df94ec
commit 4d2a5455d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 253 additions and 112 deletions

View File

@ -299,6 +299,17 @@ const Hooks = {
window.addEventListener("scroll", onScroll, { passive: true });
},
},
CopyToClipboard: {
value() { return this.el.dataset.value },
notice() { return this.el.dataset.notice },
mounted() {
this.el.addEventListener("click", () => {
navigator.clipboard.writeText(this.value()).then(() => {
this.pushEvent("copied_to_clipboard", {notice: this.notice()})
})
})
}
},
} satisfies Record<string, Partial<ViewHook> & Record<string, unknown>>;
// Accessible focus handling

View File

@ -13,7 +13,8 @@ config :algora,
"Algora TV is an interactive livestreaming & video sharing service for developers.",
admin_emails: ["zafer@algora.io", "ioannis@algora.io"],
ecto_repos: [Algora.Repo.Local],
rtmp_port: 9006
rtmp_port: 9006,
rtmp_path: "live"
# Configures the endpoint
config :algora, AlgoraWeb.Endpoint,

View File

@ -189,13 +189,10 @@ defmodule Algora.Accounts do
hashed_token = :crypto.hash(:sha256, token)
encoded_token = Base.url_encode64(hashed_token, padding: false)
{:ok, _} =
user
|> change()
|> put_change(:stream_key, encoded_token)
|> Repo.update()
{:ok, user}
end
def list_destinations(user_id) do

View File

@ -11,66 +11,25 @@ defmodule Algora.Pipeline do
@impl true
def handle_init(_context, socket: socket) do
video = Library.init_livestream!()
dir = Path.join(Admin.tmp_dir(), video.uuid)
:rpc.multicall(LLController, :start, [video.uuid, dir])
spec = [
#
child(:src, %Algora.Pipeline.SourceBin{
socket: socket,
validator: %Algora.Pipeline.MessageValidator{video_id: video.id, pid: self()}
}),
#
child(:sink, %Algora.Pipeline.SinkBin{
video_uuid: video.uuid,
hls_mode: :muxed_av,
mode: :live,
manifest_module: Algora.Pipeline.HLS,
target_window_duration: :infinity,
persist?: false,
storage: %Algora.Pipeline.Storage{video: video, directory: dir}
validator: %Algora.Pipeline.MessageValidator{pid: self()}
}),
#
get_child(:src)
|> via_out(:audio)
|> child(:tee_audio, Membrane.Tee.Master),
|> child(:tee_audio, Membrane.Tee.Parallel),
#
get_child(:src)
|> via_out(:video)
|> child(:tee_video, Membrane.Tee.Master),
#
get_child(:tee_audio)
|> via_out(:master)
|> via_in(Pad.ref(:input, :audio),
options: [
encoding: :AAC,
segment_duration: @segment_duration,
partial_segment_duration: @partial_segment_duration
]
)
|> get_child(:sink),
#
get_child(:tee_video)
|> via_out(:master)
|> via_in(Pad.ref(:input, :video),
options: [
encoding: :H264,
segment_duration: @segment_duration,
partial_segment_duration: @partial_segment_duration
]
)
|> get_child(:sink)
|> child(:tee_video, Membrane.Tee.Parallel)
]
{[spec: spec], %{socket: socket, video: video}}
{[spec: spec], %{socket: socket, video: nil, stream_key: nil}}
end
@impl true
@ -85,6 +44,8 @@ defmodule Algora.Pipeline do
end
@impl true
def handle_child_notification(:end_of_stream, _element, _ctx, %{stream_key: nil} = state), do:
{[terminate: :normal], state}
def handle_child_notification(:end_of_stream, _element, _ctx, state) do
Algora.Library.toggle_streamer_live(state.video, false)
@ -98,6 +59,11 @@ defmodule Algora.Pipeline do
{[], state}
end
@impl true
def handle_child_notification({:stream_validation_error, _phase, _reason}, _element, _ctx, state) do
{[terminate: :normal], state}
end
@impl true
def handle_child_notification(_notification, _element, _ctx, state) do
{[], state}
@ -123,13 +89,11 @@ defmodule Algora.Pipeline do
#
get_child(:tee_audio)
|> via_out(:copy)
|> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000)
|> get_child(ref),
#
get_child(:tee_video)
|> via_out(:copy)
|> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000)
|> get_child(ref)
]
@ -171,4 +135,105 @@ defmodule Algora.Pipeline do
def handle_call(:get_video_id, _ctx, state) do
{[{:reply, state.video.id}], state}
end
def handle_call({:validate_stream_key, stream_key}, _ctx, %{ stream_key: nil } = state) do
if user = Algora.Accounts.get_user_by(stream_key: stream_key) do
video = Library.init_livestream!()
dir = Path.join(Admin.tmp_dir(), video.uuid)
:rpc.multicall(LLController, :start, [video.uuid, dir])
{:ok, video} =
Algora.Library.reconcile_livestream(
%Algora.Library.Video{id: video.id},
stream_key
)
destinations = Algora.Accounts.list_active_destinations(video.user_id)
for {destination, i} <- Enum.with_index(destinations) do
url =
URI.new!(destination.rtmp_url)
|> URI.append_path("/" <> destination.stream_key)
|> URI.to_string()
send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{i}")})
end
if url = Algora.Accounts.get_restream_ws_url(user) do
Task.Supervisor.start_child(
Algora.TaskSupervisor,
fn -> Algora.Restream.Websocket.start_link(%{url: url, user: user, video: video}) end,
restart: :transient
)
end
youtube_handle =
case user.id do
307 -> "@heyandras"
9 -> "@dragonroyale"
_ -> nil
end
if youtube_handle do
DynamicSupervisor.start_child(
Algora.Youtube.Chat.Supervisor,
{Algora.Youtube.Chat.Fetcher, %{video: video, youtube_handle: youtube_handle}}
)
end
spec = [
#
child(:sink, %Algora.Pipeline.SinkBin{
video_uuid: video.uuid,
hls_mode: :muxed_av,
mode: :live,
manifest_module: Algora.Pipeline.HLS,
target_window_duration: :infinity,
persist?: false,
storage: %Algora.Pipeline.Storage{video: video, directory: dir}
}),
#
get_child(:tee_audio)
|> via_in(Pad.ref(:input, :audio),
options: [
encoding: :AAC,
segment_duration: @segment_duration,
partial_segment_duration: @partial_segment_duration
]
)
|> get_child(:sink),
#
get_child(:tee_video)
|> via_in(Pad.ref(:input, :video),
options: [
encoding: :H264,
segment_duration: @segment_duration,
partial_segment_duration: @partial_segment_duration
]
)
|> get_child(:sink)
]
{
[reply: {:ok, "success"}, spec: spec],
%{ state | stream_key: stream_key, video: video }
}
else
{[reply: {:error, "invalid stream key"}, terminate: :normal], state}
end
end
# ReleaseStream message when stream key is set with url
def handle_call({:validate_stream_key, ""}, _ctx, %{stream_key: key} = state) when is_binary(key), do:
{[reply: {:ok, "success"}], state}
# ReleaseStream message when stream key both in url and as stream key
def handle_call({:validate_stream_key, key}, _ctx, %{stream_key: key} = state), do:
{[reply: {:ok, "success"}], state}
# Release Stream message with a stream key differing from the stream key in the url
def handle_call({:validate_stream_key, _}, _ctx, %{stream_key: key} = state) when is_binary(key), do:
{[reply: {:error, "stream already setup"}, terminate: :normal], state}
end

View File

@ -3,55 +3,21 @@ defmodule Algora.Pipeline.MessageValidator do
end
defimpl Membrane.RTMP.MessageValidator, for: Algora.Pipeline.MessageValidator do
alias Membrane.RTMP.Messages
@app_name Algora.config([:rtmp_path])
@impl true
def validate_connect(impl, message) do
{:ok, video} =
Algora.Library.reconcile_livestream(
%Algora.Library.Video{id: impl.video_id},
message.app
)
destinations = Algora.Accounts.list_active_destinations(video.user_id)
for {destination, i} <- Enum.with_index(destinations) do
url =
URI.new!(destination.rtmp_url)
|> URI.append_path("/" <> destination.stream_key)
|> URI.to_string()
send(impl.pid, {:forward_rtmp, url, String.to_atom("rtmp_sink_#{i}")})
end
user = Algora.Accounts.get_user!(video.user_id)
if url = Algora.Accounts.get_restream_ws_url(user) do
Task.Supervisor.start_child(
Algora.TaskSupervisor,
fn -> Algora.Restream.Websocket.start_link(%{url: url, user: user, video: video}) end,
restart: :transient
)
end
youtube_handle =
case user.id do
307 -> "@heyandras"
9 -> "@dragonroyale"
_ -> nil
end
if youtube_handle do
DynamicSupervisor.start_child(
Algora.Youtube.Chat.Supervisor,
{Algora.Youtube.Chat.Fetcher, %{video: video, youtube_handle: youtube_handle}}
)
end
{:ok, "connect success"}
def validate_connect(_impl, %Messages.Connect{app: @app_name}), do:
{:ok, "connected"}
def validate_connect(impl, %Messages.Connect{app: stream_key}) do
# allow url based stream keys to work
GenServer.call(impl.pid, {:validate_stream_key, stream_key})
end
@impl true
def validate_release_stream(_impl, _message) do
{:ok, "release stream success"}
def validate_release_stream(impl, %Messages.ReleaseStream{stream_key: stream_key}) do
GenServer.call(impl.pid, {:validate_stream_key, stream_key})
end
@impl true

View File

@ -21,22 +21,96 @@ defmodule AlgoraWeb.SettingsLive do
<.input field={@form[:name]} label="Name" />
<.input label="Email" name="email" value={@current_user.email} disabled />
<.input field={@form[:channel_tagline]} label="Stream tagline" />
<div>
<.input
label="Stream URL"
name="stream_url"
value={"rtmp://#{URI.parse(AlgoraWeb.Endpoint.url()).host}:#{Algora.config([:rtmp_port])}/#{@current_user.stream_key}"}
disabled
/>
<p class="mt-2 text-sm text-gray-400">
<%= "Paste into OBS Studio > File > Settings > Stream > Server" %>
</p>
</div>
<:actions>
<.button>Save</.button>
</:actions>
</.simple_form>
</div>
<div class="space-y-6 bg-white/5 rounded-lg p-6 ring-1 ring-white/15">
<.header>
Stream Connection
<:subtitle>
Connection details for live streaming with RTMP
</:subtitle>
</.header>
<div class="w-full">
<div class="flex justify-between items-center">
<label class="block text-sm font-semibold leading-6 text-gray-100 mb-2">Stream URL</label>
</div>
<div class="flex items-center">
<div class="relative w-full">
<.input
class="w-full p-2.5 test-sm mr-16 py-1 px-2 leading-tight block ext-sm"
name="stream_url"
value={@stream_url}
disabled
/>
</div>
<button
id="copy_stream_url"
class="flex-shrink-0 z-10 inline-flex items-center py-3 px-4 ml-2 text-sm font-medium text-center rounded bg-gray-700 hover:bg-gray-600"
phx-hook="CopyToClipboard"
data-value={@stream_url}
data-notice="Copied Stream Url">
<span id="default-icon">
<svg class="w-4 h-4" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="currentColor" viewBox="0 0 18 20">
<path d="M16 1h-3.278A1.992 1.992 0 0 0 11 0H7a1.993 1.993 0 0 0-1.722 1H2a2 2 0 0 0-2 2v15a2 2 0 0 0 2 2h14a2 2 0 0 0 2-2V3a2 2 0 0 0-2-2Zm-3 14H5a1 1 0 0 1 0-2h8a1 1 0 0 1 0 2Zm0-4H5a1 1 0 0 1 0-2h8a1 1 0 1 1 0 2Zm0-5H5a1 1 0 0 1 0-2h2V2h4v2h2a1 1 0 1 1 0 2Z"/>
</svg>
</span>
<span id="success-icon" class="hidden inline-flex items-center">
<svg class="w-4 h-4" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 16 12">
<path stroke="currentColor" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M1 5.917 5.724 10.5 15 1.5"/>
</svg>
</span>
</button>
</div>
<p class="mt-2 text-sm text-gray-400">
<%= "Paste into OBS Studio > File > Settings > Stream > Server" %>
</p>
</div>
<div class="w-full">
<div class="flex justify-between items-center">
<label class="block text-sm font-semibold leading-6 text-gray-100 mb-2">Stream Key</label>
</div>
<div class="flex items-center">
<button
phx-click="regenerate_stream_key"
class="flex-shrink-0 z-10 inline-flex items-center py-2 px-4 mr-2 text-sm font-medium text-center rounded bg-gray-700 hover:bg-gray-600">
Generate
</button>
<div class="relative w-full">
<.input
id="stream_key"
name="stream_key"
class="w-full p-2.5 test-sm mr-16 py-1 px-2 leading-tight block ext-sm"
value={@stream_key}
disabled
/>
</div>
<button
id="copy_stream_key"
class="flex-shrink-0 z-10 inline-flex items-center py-3 px-4 ml-2 text-sm font-medium text-center rounded bg-gray-700 hover:bg-gray-600"
phx-hook="CopyToClipboard"
data-value={@stream_key}
data-notice="Copied Stream Key">
<span id="default-icon">
<svg class="w-4 h-4" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="currentColor" viewBox="0 0 18 20">
<path d="M16 1h-3.278A1.992 1.992 0 0 0 11 0H7a1.993 1.993 0 0 0-1.722 1H2a2 2 0 0 0-2 2v15a2 2 0 0 0 2 2h14a2 2 0 0 0 2-2V3a2 2 0 0 0-2-2Zm-3 14H5a1 1 0 0 1 0-2h8a1 1 0 0 1 0 2Zm0-4H5a1 1 0 0 1 0-2h8a1 1 0 1 1 0 2Zm0-5H5a1 1 0 0 1 0-2h2V2h4v2h2a1 1 0 1 1 0 2Z"/>
</svg>
</span>
<span id="success-icon" class="hidden inline-flex items-center">
<svg class="w-4 h-4" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 16 12">
<path stroke="currentColor" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M1 5.917 5.724 10.5 15 1.5"/>
</svg>
</span>
</button>
</div>
<p class="mt-2 text-sm text-gray-400">
<%= "Paste into OBS Studio > File > Settings > Stream > Stream Key" %>
</p>
</div>
</div>
<div class="space-y-6 bg-white/5 rounded-lg p-6 ring-1 ring-white/15">
<.header>
Integrations
@ -153,6 +227,10 @@ defmodule AlgoraWeb.SettingsLive do
destinations = Accounts.list_destinations(current_user.id)
destination_changeset = Accounts.change_destination(%Destination{})
connected_with_restream = Accounts.has_restream_token?(current_user)
rtmp_host = case URI.parse(AlgoraWeb.Endpoint.url()).host do
"localhost" -> "127.0.0.1"
host -> host
end
{:ok,
socket
@ -161,7 +239,12 @@ defmodule AlgoraWeb.SettingsLive do
|> assign(destinations: destinations)
|> assign(destination_form: to_form(destination_changeset))
|> assign(show_add_destination_modal: false)
|> assign(connected_with_restream: connected_with_restream)}
|> assign(stream_key: current_user.stream_key)
|> assign(connected_with_restream: connected_with_restream),
temporary_assigns: [
stream_url: "rtmp://#{rtmp_host}:#{Algora.config([:rtmp_port])}/#{Algora.config([:rtmp_path])}"
]
}
end
def handle_event("validate", %{"user" => params}, socket) do
@ -212,6 +295,24 @@ defmodule AlgoraWeb.SettingsLive do
end
end
def handle_event("regenerate_stream_key", _params, socket) do
case Accounts.gen_stream_key(socket.assigns.current_user) do
{:ok, user} ->
{:noreply,
socket
|> assign(stream_key: user.stream_key)
|> put_flash(:info, "Stream key regenerated!")}
{:error, _changeset} ->
{:noreply,
socket
|> put_flash(:error, "Failed to regenerate stream key!")}
end
end
def handle_event("copied_to_clipboard", %{ "notice" => notice }, socket) do
{:noreply, socket |> put_flash(:info, notice)}
end
def handle_params(params, _url, socket) do
{:noreply, socket |> apply_action(socket.assigns.live_action, params)}
end