diff --git a/lib/algora/pipeline.ex b/lib/algora/pipeline.ex index ce2431e..b837e62 100644 --- a/lib/algora/pipeline.ex +++ b/lib/algora/pipeline.ex @@ -34,34 +34,6 @@ defmodule Algora.Pipeline do def partial_segment_duration(), do: @partial_segment_duration_milliseconds - def handle_new_client(client_ref, app, stream_key) do - Membrane.Logger.info("Handling new client for pipeline #{app}") - params = %{ - client_ref: client_ref, - app: app, - stream_key: stream_key, - video_uuid: nil - } - - {:ok, pid} = with true <- Algora.config([:resume_rtmp]), - {pid, metadata} when is_pid(pid) <- :syn.lookup(:pipelines, stream_key) do - Algora.Pipeline.resume_rtmp(pid, %{ params | video_uuid: metadata[:video_uuid] }) - {:ok, pid} - else - _ -> - {:ok, _sup, pid} = - Membrane.Pipeline.start_link(Algora.Pipeline, params) - {:ok, pid} - end - - if Algora.config([:transcode]) do - # will send SetDataFrame message to pid - {Algora.Pipeline.ClientHandler, %{}, pid} - else - {Algora.Pipeline.ClientHandler, %{}} - end - end - def resume_rtmp(pipeline, params) when is_pid(pipeline) do Membrane.Logger.info("Resuming pipeline #{inspect(params)}") GenServer.call(pipeline, {:resume_rtmp, params}) @@ -330,12 +302,13 @@ defmodule Algora.Pipeline do {[], state} end - def handle_info(%Messages.SetDataFrame{} = message, _ctx, %{data_frame: nil} = state) do + + def handle_info({:metadata_message, %Messages.SetDataFrame{} = message}, _ctx, %{data_frame: nil} = state) do send(self(), :link_tracks) {[], %{ state | data_frame: message }} end - def handle_info(%Messages.SetDataFrame{} = _message, _ctx, state) do + def handle_info({:metadata_message, _message}, _ctx, state) do {[], state} end diff --git a/lib/algora/pipeline/client_handler.ex b/lib/algora/pipeline/client_handler.ex index 8059e60..f9eba3c 100644 --- a/lib/algora/pipeline/client_handler.ex +++ b/lib/algora/pipeline/client_handler.ex @@ -9,10 +9,11 @@ defmodule Algora.Pipeline.ClientHandler do defstruct [] @impl true - def handle_init(_opts) do + def handle_init(%{pipeline: pid}) do %{ source_pid: nil, - buffered: [] + buffered: [], + pipeline: pid } end @@ -51,6 +52,12 @@ defmodule Algora.Pipeline.ClientHandler do state end + @impl true + def handle_metadata(message, state) do + send(state.pipeline, message) + state + end + defp send_data(pid, payload) do send(pid, {:data, payload}) :ok diff --git a/lib/algora/pipeline/manager.ex b/lib/algora/pipeline/manager.ex index 01bfc3f..584b5dd 100644 --- a/lib/algora/pipeline/manager.ex +++ b/lib/algora/pipeline/manager.ex @@ -19,7 +19,7 @@ defmodule Algora.Pipeline.Manager do FLAME.place_child(Algora.Pipeline.Pool, {__MODULE__, [self(), params]}) end - {Algora.Pipeline.ClientHandler, %{}, pid} + {Algora.Pipeline.ClientHandler, %{pipeline: pid}} end def resume_rtmp(pipeline, params) when is_pid(pipeline) do diff --git a/mix.exs b/mix.exs index 6846cab..98ff5cc 100644 --- a/mix.exs +++ b/mix.exs @@ -61,7 +61,7 @@ defmodule Algora.MixProject do {:membrane_h264_ffmpeg_plugin, "~> 0.32.3"}, {:membrane_h265_ffmpeg_plugin, "~> 0.4.1"}, {:membrane_http_adaptive_stream_plugin, "~> 0.18.5"}, - {:membrane_rtmp_plugin, github: "lastcanal/membrane_rtmp_plugin", branch: "send_messages_to_pipeline"}, + {:membrane_rtmp_plugin, github: "lastcanal/membrane_rtmp_plugin", branch: "v0.27.2"}, {:membrane_tee_plugin, "~> 0.12.0"}, {:membrane_file_plugin, "~> 0.17.2"}, {:membrane_mp4_plugin, "~> 0.35.2"}, @@ -70,6 +70,7 @@ defmodule Algora.MixProject do {:membrane_ffmpeg_swscale_plugin, "~> 0.15.1"}, {:membrane_raw_video_parser_plugin, "~> 0.12.1"}, {:membrane_abr_transcoder_plugin, "~> 0.1.1"}, + {:membrane_aac_plugin, "~> 0.19.0", override: true}, {:mint, "~> 1.0"}, {:oban, "~> 2.16"}, {:open_api_spex, "~> 3.16"}, @@ -81,6 +82,7 @@ defmodule Algora.MixProject do {:phoenix_live_view, "~> 0.20.2"}, {:phoenix, "~> 1.7.11"}, {:plug_cowboy, "~> 2.5"}, + {:ratio, "~> 4.0.1", override: true}, {:replicate, "~> 1.2.0"}, {:reverse_proxy_plug, "~> 3.0"}, {:slugify, "~> 1.3"}, diff --git a/mix.lock b/mix.lock index bf5f610..65549a7 100644 --- a/mix.lock +++ b/mix.lock @@ -64,7 +64,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "0.1.4", "29563475afa9b8a2add1b7a9c8fb68d06ca7737648f28398e04461f008b69521", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f4ed47ecda66de70dd817698a703f8816daa91272e7e45812469498614ae8b29"}, "membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.9", "1bf4ee0e5fc5bae3eae1b8f2d100d71e41f2bfaaa366ac3c6267572f6bd71f24", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "aed8f58eac2c465d1e2731179f81969cb212990163c6fdf1aa27679658969be0"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, - "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.2", "7da3f73e60ad12178623379fd62e0a90e127d65541ab9c46f35e996642db4786", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "24060500f83b0697cccbe8528ba48c0fbc225b88c87057c413aa3bf5d0dcb6a6"}, + "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.19.0", "58a15efaaa4f2cc91b968464cfd269244e035efdd983aac2e3ddeb176fcf0585", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "eb7e786e650608ee205f4eebff4c1df3677e545acf09802458f77f64f9942fe9"}, "membrane_abr_transcoder_plugin": {:hex, :membrane_abr_transcoder_plugin, "0.1.1", "6b0a8abea0c30f986b37cbfe75a619c22d4cdaca11fbaeefc5b6a62cbf6c8d6b", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3", [hex: :typed_struct, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a4ee03d21fe15ee3146a8cc8b76462e4c6ab44122325c87a95d029a77ea3cf21"}, "membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"}, "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, @@ -93,7 +93,7 @@ "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"}, "membrane_raw_video_parser_plugin": {:hex, :membrane_raw_video_parser_plugin, "0.12.1", "fc0ac1f995411c3e3ccd93ac7ff8fe30930f8ff76d404b2f2a585d7efed6636f", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}], "hexpm", "bdc7859c9d576f59dd221cfa2a29940b4c58637b321279c23cb7c9e413436b65"}, "membrane_realtimer_plugin": {:hex, :membrane_realtimer_plugin, "0.9.0", "27210d5e32a5e8bfd101c41e4d8c1876e873a52cc129ebfbee4d0ccbea1cbd21", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b2e96d62135ee57ef9a5fdea94b3a9ab1198e5ea8ee248391b89c671125d1b51"}, - "membrane_rtmp_plugin": {:git, "https://github.com/lastcanal/membrane_rtmp_plugin.git", "944d7e524a37a65cd740151b469af76146f3d21c", [branch: "send_messages_to_pipeline"]}, + "membrane_rtmp_plugin": {:git, "https://github.com/lastcanal/membrane_rtmp_plugin.git", "c5ed3472f06081b903e3789436d9ab5255c550db", [branch: "v0.27.2"]}, "membrane_scissors_plugin": {:hex, :membrane_scissors_plugin, "0.8.0", "c8ee6d5b2d452d034d17a65a629bb13872dccb9fa9d232dc9bb85738d5723305", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:stream_split, "~> 0.1.3", [hex: :stream_split, repo: "hexpm", optional: false]}], "hexpm", "22487e6a4d45f4c85a50a93ff5c63e196175c04007365e3bb070d91b8e93ad02"}, "membrane_tee_plugin": {:hex, :membrane_tee_plugin, "0.12.0", "f94989b4080ef4b7937d74c1a14d3379577c7bd4c6d06e5a2bb41c351ad604d4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "0d61c9ed5e68e5a75d54200e1c6df5739c0bcb52fee0974183ad72446a179887"}, "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, @@ -134,7 +134,7 @@ "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "rambo": {:hex, :rambo, "0.3.4", "8962ac3bd1a633ee9d0e8b44373c7913e3ce3d875b4151dcd060886092d2dce7", [:mix], [], "hexpm", "0cc54ed089fbbc84b65f4b8a774224ebfe60e5c80186fafc7910b3e379ad58f1"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, - "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, + "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, "replicate": {:hex, :replicate, "1.2.0", "802d6826a89a11aded0d3586d6d0418db3c20590f5cc04727b9951e0efe566e9", [:mix], [{:httpoison, "~> 2.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6eeeef65de231784937b0f4c0336fe247caf5845930d2ee667d2b3ed5cec888d"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "reverse_proxy_plug": {:hex, :reverse_proxy_plug, "3.0.0", "caf347e3677e115b07e85a445b9d0d797199d8ef809abe7c2d51cccf17f2b12f", [:mix], [{:finch, "~> 0.18", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.2 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: false]}, {:req, "~> 0.3", [hex: :req, repo: "hexpm", optional: true]}, {:tesla, "~> 1.4", [hex: :tesla, repo: "hexpm", optional: true]}], "hexpm", "6056dc0f32cc42fa55d79abdd3b53f171c1ab1f6f7da26bd18b6bf58d70bdede"},