1
0
mirror of https://github.com/algora-io/tv.git synced 2024-11-26 01:00:20 +02:00

reorganize pipeline components

This commit is contained in:
zafer 2024-06-23 14:53:18 +03:00
parent 239d4d93ec
commit 4988ba7a8d
7 changed files with 146 additions and 143 deletions

View File

@ -8,9 +8,9 @@ defmodule Algora.Pipeline do
spec = [
#
child(:src, %Algora.SourceBin{
child(:src, %Algora.Pipeline.SourceBin{
socket: socket,
validator: %Algora.MessageValidator{video_id: video.id, pid: self()}
validator: %Algora.Pipeline.MessageValidator{video_id: video.id, pid: self()}
}),
#
@ -20,7 +20,7 @@ defmodule Algora.Pipeline do
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: :infinity,
persist?: false,
storage: %Algora.Storage{video: video}
storage: %Algora.Pipeline.Storage{video: video}
}),
#

View File

@ -0,0 +1,9 @@
# Algora Media Processing Pipeline
The code in this directory is built upon the work from the [Membrane Framework](https://github.com/membraneframework). Their efforts provided a robust starting point, and we have made modifications and additions to better suit our project’s objectives.
We would like to explicitly acknowledge and thank the authors and maintainers of the Membrane Framework for making their work available to the community under an open source license. They have laid the groundwork that enabled us to build and innovate further.
## License
This subdirectory, as with the rest of the codebase, is licensed under the terms of the [AGPLv3 License](https://github.com/algora-io/tv/blob/main/LICENSE). The original codebase and plugins can be found [here](https://github.com/membraneframework/membrane_core), which are licensed under the [Apache License 2.0](https://github.com/membraneframework/membrane_core/blob/master/LICENSE).

View File

@ -1,4 +1,4 @@
defmodule Algora.Demuxer do
defmodule Algora.Pipeline.Demuxer do
@moduledoc """
Element for demuxing FLV streams into audio and video streams.
FLV format supports only one video and audio stream.

View File

@ -1,4 +1,4 @@
defmodule Algora.SourceBin do
defmodule Algora.Pipeline.SourceBin do
@moduledoc """
Bin responsible for demuxing and parsing an RTMP stream.
@ -69,7 +69,7 @@ defmodule Algora.SourceBin do
validator: opts.validator,
use_ssl?: opts.use_ssl?
})
|> child(:demuxer, Algora.Demuxer),
|> child(:demuxer, Algora.Pipeline.Demuxer),
#
child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none

View File

@ -0,0 +1,130 @@
defmodule Algora.Pipeline.Storage do
@behaviour Membrane.HTTPAdaptiveStream.Storage
require Membrane.Logger
alias Algora.Library
@pubsub Algora.PubSub
@enforce_keys [:video]
defstruct @enforce_keys ++ [video_header: <<>>, video_segment: <<>>, setup_completed?: false]
@type t :: %__MODULE__{
video: Library.Video.t(),
video_header: <<>>,
video_segment: <<>>,
setup_completed?: boolean()
}
@impl true
def init(%__MODULE__{} = config), do: config
@impl true
def store(
parent_id,
name,
contents,
metadata,
ctx,
%{video: video} = state
) do
path = "#{video.uuid}/#{name}"
with {t, {:ok, _}} <- :timer.tc(&Algora.Storage.upload/3, [contents, path, upload_opts(ctx)]),
{:ok, state} <- process_contents(parent_id, name, contents, metadata, ctx, state) do
size = :erlang.byte_size(contents) / 1_000
time = t / 1_000
region = System.get_env("FLY_REGION") || "local"
case ctx do
%{type: :segment} ->
Membrane.Logger.info(
"Uploaded #{Float.round(size, 1)} kB in #{Float.round(time, 1)} ms (#{Float.round(size / time, 1)} MB/s, #{region})"
)
_ ->
nil
end
{:ok, state}
else
{:error, reason} = err ->
Membrane.Logger.error("Failed to upload #{path}: #{reason}")
{err, state}
end
end
defp upload_opts(%{type: :manifest} = _ctx) do
[
content_type: "application/x-mpegURL",
cache_control: "no-cache, no-store, private"
]
end
defp upload_opts(%{type: :segment} = _ctx) do
[content_type: "video/mp4"]
end
defp upload_opts(_ctx), do: []
@impl true
def remove(_parent_id, _name, _ctx, state) do
{{:error, :not_implemented}, state}
end
defp process_contents(
:video,
_name,
contents,
_metadata,
%{type: :header, mode: :binary},
state
) do
{:ok, %{state | video_header: contents}}
end
defp process_contents(
:video,
_name,
contents,
_metadata,
%{type: :segment, mode: :binary},
%{setup_completed?: false, video: video, video_header: video_header} = state
) do
Task.Supervisor.start_child(Algora.TaskSupervisor, fn ->
with {:ok, video} <- Library.store_thumbnail(video, video_header <> contents),
{:ok, video} <- Library.store_og_image(video) do
broadcast_thumbnails_generated!(video)
else
_ ->
Membrane.Logger.error("Could not generate thumbnails for video #{video.id}")
end
end)
{:ok, %{state | setup_completed?: true, video_segment: contents}}
end
defp process_contents(
:video,
_name,
contents,
_metadata,
%{type: :segment, mode: :binary},
state
) do
{:ok, %{state | video_segment: contents}}
end
defp process_contents(_parent_id, _name, _contents, _metadata, _ctx, state) do
{:ok, state}
end
defp broadcast!(topic, msg) do
Phoenix.PubSub.broadcast!(@pubsub, topic, {__MODULE__, msg})
end
defp broadcast_thumbnails_generated!(video) do
broadcast!(Library.topic_livestreams(), %Library.Events.ThumbnailsGenerated{video: video})
end
end

View File

@ -1,4 +1,4 @@
defmodule Algora.MessageValidator do
defmodule Algora.Pipeline.MessageValidator do
defstruct [:video_id, :pid]
end

View File

@ -1,139 +1,11 @@
defmodule Algora.Storage do
@behaviour Membrane.HTTPAdaptiveStream.Storage
require Membrane.Logger
alias Algora.Library
@pubsub Algora.PubSub
@enforce_keys [:video]
defstruct @enforce_keys ++ [video_header: <<>>, video_segment: <<>>, setup_completed?: false]
@type t :: %__MODULE__{
video: Library.Video.t(),
video_header: <<>>,
video_segment: <<>>,
setup_completed?: boolean()
}
@impl true
def init(%__MODULE__{} = config), do: config
@impl true
def store(
parent_id,
name,
contents,
metadata,
ctx,
%{video: video} = state
) do
path = "#{video.uuid}/#{name}"
with {t, {:ok, _}} <- :timer.tc(&upload/3, [contents, path, upload_opts(ctx)]),
{:ok, state} <- process_contents(parent_id, name, contents, metadata, ctx, state) do
size = :erlang.byte_size(contents) / 1_000
time = t / 1_000
region = System.get_env("FLY_REGION") || "local"
case ctx do
%{type: :segment} ->
Membrane.Logger.info(
"Uploaded #{Float.round(size, 1)} kB in #{Float.round(time, 1)} ms (#{Float.round(size / time, 1)} MB/s, #{region})"
)
_ ->
nil
end
{:ok, state}
else
{:error, reason} = err ->
Membrane.Logger.error("Failed to upload #{path}: #{reason}")
{err, state}
end
end
def endpoint_url do
%{scheme: scheme, host: host} = Application.fetch_env!(:ex_aws, :s3) |> Enum.into(%{})
"#{scheme}#{host}"
end
def upload_regions do
[System.get_env("FLY_REGION") || "fra", "sjc", "fra"]
|> Enum.uniq()
|> Enum.join(",")
end
defp upload_opts(%{type: :manifest} = _ctx) do
[
content_type: "application/x-mpegURL",
cache_control: "no-cache, no-store, private"
]
end
defp upload_opts(%{type: :segment} = _ctx) do
[content_type: "video/mp4"]
end
defp upload_opts(_ctx), do: []
@impl true
def remove(_parent_id, _name, _ctx, state) do
{{:error, :not_implemented}, state}
end
defp process_contents(
:video,
_name,
contents,
_metadata,
%{type: :header, mode: :binary},
state
) do
{:ok, %{state | video_header: contents}}
end
defp process_contents(
:video,
_name,
contents,
_metadata,
%{type: :segment, mode: :binary},
%{setup_completed?: false, video: video, video_header: video_header} = state
) do
Task.Supervisor.start_child(Algora.TaskSupervisor, fn ->
with {:ok, video} <- Library.store_thumbnail(video, video_header <> contents),
{:ok, video} <- Library.store_og_image(video) do
broadcast_thumbnails_generated!(video)
else
_ ->
Membrane.Logger.error("Could not generate thumbnails for video #{video.id}")
end
end)
{:ok, %{state | setup_completed?: true, video_segment: contents}}
end
defp process_contents(
:video,
_name,
contents,
_metadata,
%{type: :segment, mode: :binary},
state
) do
{:ok, %{state | video_segment: contents}}
end
defp process_contents(_parent_id, _name, _contents, _metadata, _ctx, state) do
{:ok, state}
end
def upload_to_bucket(contents, remote_path, bucket, opts \\ []) do
op = Algora.config([:buckets, bucket]) |> ExAws.S3.put_object(remote_path, contents, opts)
# op = %{op | headers: op.headers |> Map.merge(%{"x-tigris-regions" => upload_regions()})}
ExAws.request(op, [])
end
@ -181,12 +53,4 @@ defmodule Algora.Storage do
err -> err
end
end
defp broadcast!(topic, msg) do
Phoenix.PubSub.broadcast!(@pubsub, topic, {__MODULE__, msg})
end
defp broadcast_thumbnails_generated!(video) do
broadcast!(Library.topic_livestreams(), %Library.Events.ThumbnailsGenerated{video: video})
end
end