1
0
mirror of https://github.com/algora-io/tv.git synced 2025-10-30 23:07:56 +02:00

add multistreaming support (#35)

* replace SourceBin with Source + Demuxer + Parser

* add Tee.Master

* update topology

* add copy channel

* reorganize spec

* fix pad refs

* dynamically update pipeline topology for each rtmp sink

* implement user defined destinations

* validate destination URL

* revamp ui

* remove dbg calls

* remove autocomplete for stream key input

* update validation message
This commit is contained in:
Zafer Cesur
2024-05-20 15:58:24 +03:00
committed by GitHub
parent c20bd5e9f9
commit 9ac9b08176
8 changed files with 260 additions and 36 deletions

View File

@@ -3,7 +3,7 @@ defmodule Algora.Accounts do
import Ecto.Changeset
alias Algora.Repo
alias Algora.Accounts.{User, Identity}
alias Algora.Accounts.{User, Identity, Destination}
def list_users(opts) do
Repo.all(from u in User, limit: ^Keyword.fetch!(opts, :limit))
@@ -129,4 +129,31 @@ defmodule Algora.Accounts do
{:ok, user}
end
def list_destinations(user_id) do
Repo.all(from d in Destination, where: d.user_id == ^user_id)
end
def list_active_destinations(user_id) do
Repo.all(from d in Destination, where: d.user_id == ^user_id and d.active == true)
end
def get_destination!(id), do: Repo.get!(Destination, id)
def change_destination(%Destination{} = destination, attrs \\ %{}) do
destination |> Destination.changeset(attrs)
end
def create_destination(user, attrs \\ %{}) do
%Destination{}
|> Destination.changeset(attrs)
|> put_change(:user_id, user.id)
|> Repo.insert()
end
def update_destination(%Destination{} = destination, attrs) do
destination
|> Destination.changeset(attrs)
|> Repo.update()
end
end

View File

@@ -0,0 +1,45 @@
defmodule Algora.Accounts.Destination do
use Ecto.Schema
import Ecto.Changeset
schema "destinations" do
field :rtmp_url, :string
field :stream_key, :string, redact: true
field :active, :boolean, default: true
belongs_to :user, Algora.Accounts.User
timestamps()
end
def changeset(destination, attrs) do
destination
|> cast(attrs, [:rtmp_url, :stream_key, :active])
|> validate_required([:rtmp_url, :stream_key])
|> validate_rtmp_url()
end
defp validate_rtmp_url(changeset) do
validate_change(changeset, :rtmp_url, fn :rtmp_url, rtmp_url ->
case valid_rtmp_url?(rtmp_url) do
:ok ->
[]
{:error, message} ->
[rtmp_url: message]
end
end)
end
defp valid_rtmp_url?(url) do
case URI.parse(url) do
%URI{scheme: scheme, host: host} when scheme in ["rtmp", "rtmps"] ->
case :inet.gethostbyname(to_charlist(host)) do
{:ok, _} -> :ok
{:error, _} -> {:error, "must be a valid URL"}
end
_ ->
{:error, "must be a valid URL starting with rtmp:// or rtmps://"}
end
end
end

View File

@@ -527,6 +527,8 @@ defmodule Algora.Library do
set: [user_id: user.id, title: user.channel_tagline, visibility: user.visibility]
)
video = get_video!(video.id)
case result do
{1, _} -> {:ok, video}
_ -> {:error, :invalid}

View File

@@ -7,16 +7,14 @@ defmodule Algora.Pipeline do
video = Library.init_livestream!()
spec = [
# audio
#
child(:src, %Membrane.RTMP.SourceBin{
socket: socket,
validator: %Algora.MessageValidator{video_id: video.id}
})
|> via_out(:audio)
|> via_in(Pad.ref(:input, :audio),
options: [encoding: :AAC, segment_duration: Membrane.Time.seconds(2)]
)
|> child(:sink, %Membrane.HTTPAdaptiveStream.SinkBin{
validator: %Algora.MessageValidator{video_id: video.id, pid: self()}
}),
#
child(:sink, %Membrane.HTTPAdaptiveStream.SinkBin{
mode: :live,
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: :infinity,
@@ -24,10 +22,28 @@ defmodule Algora.Pipeline do
storage: %Algora.Storage{video: video}
}),
# video
#
get_child(:src)
|> via_out(:audio)
|> child(:tee_audio, Membrane.Tee.Master),
#
get_child(:src)
|> via_out(:video)
|> via_in(Pad.ref(:input, :video),
|> child(:tee_video, Membrane.Tee.Master),
#
get_child(:tee_audio)
|> via_out(:master)
|> via_in(Pad.ref(:input, :audio_sink),
options: [encoding: :AAC, segment_duration: Membrane.Time.seconds(2)]
)
|> get_child(:sink),
#
get_child(:tee_video)
|> via_out(:master)
|> via_in(Pad.ref(:input, :video_sink),
options: [encoding: :H264, segment_duration: Membrane.Time.seconds(2)]
)
|> get_child(:sink)
@@ -71,6 +87,27 @@ defmodule Algora.Pipeline do
{[], state}
end
def handle_info({:forward_rtmp, url, ref}, _ctx, state) do
spec = [
#
child(ref, %Membrane.RTMP.Sink{rtmp_url: url}),
#
get_child(:tee_audio)
|> via_out(:copy)
|> via_in(Pad.ref(:audio, 0))
|> get_child(ref),
#
get_child(:tee_video)
|> via_out(:copy)
|> via_in(Pad.ref(:video, 0))
|> get_child(ref)
]
{[spec: spec], state}
end
@impl true
def handle_call(:get_video_id, _ctx, state) do
{[{:reply, state.video.id}], state}

View File

@@ -1,5 +1,5 @@
defmodule Algora.MessageValidator do
defstruct [:video_id]
defstruct [:video_id, :pid]
end
defimpl Membrane.RTMP.MessageValidator, for: Algora.MessageValidator do
@@ -12,6 +12,18 @@ defimpl Membrane.RTMP.MessageValidator, for: Algora.MessageValidator do
)
Algora.Library.toggle_streamer_live(video, true)
destinations = Algora.Accounts.list_active_destinations(video.user_id)
for {destination, i} <- Enum.with_index(destinations) do
url =
URI.new!(destination.rtmp_url)
|> URI.append_path("/" <> destination.stream_key)
|> URI.to_string()
send(impl.pid, {:forward_rtmp, url, String.to_atom("rtmp_sink_#{i}")})
end
{:ok, "connect success"}
end

View File

@@ -2,38 +2,91 @@ defmodule AlgoraWeb.SettingsLive do
use AlgoraWeb, :live_view
alias Algora.Accounts
alias Algora.Accounts.Destination
def render(assigns) do
~H"""
<div class="max-w-3xl mx-auto bg-gray-800/50 rounded-lg p-4">
<.header class="pb-6">
Settings
<:subtitle>
Update your account details
</:subtitle>
</.header>
<div class="space-y-6">
<.header>
Settings
<:subtitle>
Update your account details
</:subtitle>
</.header>
<.simple_form for={@form} phx-change="validate" phx-submit="save">
<.input field={@form[:handle]} label="Handle" />
<.input field={@form[:name]} label="Name" />
<.input label="Email" name="email" value={@current_user.email} disabled />
<.input field={@form[:channel_tagline]} label="Stream tagline" />
<div>
<.input
label="Stream URL"
name="stream_url"
value={"rtmp://#{URI.parse(AlgoraWeb.Endpoint.url()).host}:#{Algora.config([:rtmp_port])}/#{@current_user.stream_key}"}
disabled
/>
<p class="mt-2 text-sm text-gray-400">
<%= "Paste into OBS Studio > File > Settings > Stream > Server" %>
</p>
<.simple_form for={@form} phx-change="validate" phx-submit="save">
<.input field={@form[:handle]} label="Handle" />
<.input field={@form[:name]} label="Name" />
<.input label="Email" name="email" value={@current_user.email} disabled />
<.input field={@form[:channel_tagline]} label="Stream tagline" />
<div>
<.input
label="Stream URL"
name="stream_url"
value={"rtmp://#{URI.parse(AlgoraWeb.Endpoint.url()).host}:#{Algora.config([:rtmp_port])}/#{@current_user.stream_key}"}
disabled
/>
<p class="mt-2 text-sm text-gray-400">
<%= "Paste into OBS Studio > File > Settings > Stream > Server" %>
</p>
</div>
<:actions>
<.button>Save</.button>
</:actions>
</.simple_form>
</div>
<div class="mt-12 space-y-6">
<.header>
Multistreaming
<:subtitle>
Stream to multiple destinations
</:subtitle>
</.header>
<div class="space-y-6">
<ul :if={length(@destinations) > 0} class="space-y-2">
<%= for destination <- @destinations do %>
<li class="w-full py-2 px-3 border border-gray-600 bg-gray-950 rounded-md shadow-sm focus:outline-none focus:ring-gray-900 focus:border-gray-900 flex items-center justify-between">
<span><%= destination.rtmp_url %></span>
<label class="inline-flex items-center cursor-pointer">
<span class="mr-3 text-sm font-medium text-gray-900 dark:text-gray-300">
<%= if destination.active do %>
Active
<% else %>
Inactive
<% end %>
</span>
<input
type="checkbox"
value=""
class="sr-only peer"
checked={destination.active}
phx-value-id={destination.id}
phx-click="toggle_destination"
/>
<div class="relative w-11 h-6 bg-gray-200 rounded-full peer dark:bg-gray-700 peer-focus:ring-4 peer-focus:ring-purple-300 dark:peer-focus:ring-purple-800 peer-checked:after:translate-x-full rtl:peer-checked:after:-translate-x-full peer-checked:after:border-white after:content-[''] after:absolute after:top-0.5 after:start-[2px] after:bg-white after:border-gray-300 after:border after:rounded-full after:h-5 after:w-5 after:transition-all dark:border-gray-600 peer-checked:bg-purple-600">
</div>
</label>
</li>
<% end %>
</ul>
<.button phx-click="show_add_destination_modal">Add Destination</.button>
</div>
</div>
</div>
<!-- Add Destination Modal -->
<.modal :if={@show_add_destination_modal} id="add-destination-modal" show>
<.header>
Add Destination
</.header>
<.simple_form for={@destination_form} phx-submit="add_destination">
<.input field={@destination_form[:rtmp_url]} label="RTMP URL" />
<.input field={@destination_form[:stream_key]} label="Stream key" autocomplete="off" />
<:actions>
<.button>Save</.button>
<.button>Add Destination</.button>
</:actions>
</.simple_form>
</div>
</.modal>
"""
end
@@ -48,11 +101,16 @@ defmodule AlgoraWeb.SettingsLive do
end
changeset = Accounts.change_settings(current_user, %{})
destinations = Accounts.list_destinations(current_user.id)
destination_changeset = Accounts.change_destination(%Destination{})
{:ok,
socket
|> assign(current_user: current_user)
|> assign_form(changeset)}
|> assign_form(changeset)
|> assign(destinations: destinations)
|> assign(destination_form: to_form(destination_changeset))
|> assign(show_add_destination_modal: false)}
end
def handle_event("validate", %{"user" => params}, socket) do
@@ -77,6 +135,32 @@ defmodule AlgoraWeb.SettingsLive do
end
end
def handle_event("toggle_destination", %{"id" => id}, socket) do
destination = Accounts.get_destination!(id)
Accounts.update_destination(destination, %{active: !destination.active})
{:noreply,
assign(socket, :destinations, Accounts.list_destinations(socket.assigns.current_user.id))}
end
def handle_event("show_add_destination_modal", _params, socket) do
{:noreply, assign(socket, show_add_destination_modal: true)}
end
def handle_event("add_destination", %{"destination" => destination_params}, socket) do
case Accounts.create_destination(socket.assigns.current_user, destination_params) do
{:ok, _destination} ->
{:noreply,
socket
|> assign(:show_add_destination_modal, false)
|> assign(:destinations, Accounts.list_destinations(socket.assigns.current_user.id))
|> put_flash(:info, "Destination added successfully!")}
{:error, changeset} ->
{:noreply, assign(socket, destination_form: to_form(changeset))}
end
end
def handle_params(params, _url, socket) do
{:noreply, socket |> apply_action(socket.assigns.live_action, params)}
end

View File

@@ -57,6 +57,7 @@ defmodule Algora.MixProject do
{:membrane_core, "~> 1.0"},
{:membrane_http_adaptive_stream_plugin, "~> 0.18.0"},
{:membrane_rtmp_plugin, "~> 0.20.0"},
{:membrane_tee_plugin, "~> 0.12.0"},
{:mint, "~> 1.0"},
{:oban, "~> 2.16"},
{:phoenix_ecto, "~> 4.4"},

View File

@@ -0,0 +1,16 @@
defmodule Algora.Repo.Local.Migrations.CreateDestinations do
use Ecto.Migration
def change do
create table(:destinations) do
add :rtmp_url, :string, null: false
add :stream_key, :string, null: false
add :active, :boolean, default: true, null: false
add :user_id, references(:users, on_delete: :delete_all), null: false
timestamps()
end
create index(:destinations, [:user_id])
end
end