From 5be1baabce789fd52ccec0ae92ab0fac7bec571b Mon Sep 17 00:00:00 2001 From: ty Date: Tue, 8 Oct 2024 21:29:32 -0400 Subject: [PATCH] reconnect rtmp to any node via syn --- config/dev.exs | 4 ++-- lib/algora/application.ex | 2 ++ lib/algora/pipeline.ex | 7 +++---- mix.exs | 1 + mix.lock | 1 + 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/config/dev.exs b/config/dev.exs index 3fce04f..79e177c 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -62,7 +62,7 @@ config :algora, Algora.Repo.Local, config :algora, AlgoraWeb.Endpoint, # Binding to loopback ipv4 address prevents access from other machines. # Change to `ip: {0, 0, 0, 0}` to allow access from other machines. - http: [ip: {0, 0, 0, 0}, port: 4000], + http: [ip: {0, 0, 0, 0}, port: String.to_integer(System.get_env("PORT") || "4000") ], debug_errors: true, code_reloader: true, check_origin: false, @@ -74,7 +74,7 @@ config :algora, AlgoraWeb.Endpoint, config :algora, AlgoraWeb.Embed.Endpoint, # Binding to loopback ipv4 address prevents access from other machines. # Change to `ip: {0, 0, 0, 0}` to allow access from other machines. - http: [ip: {0, 0, 0, 0}, port: 4001] + http: [ip: {0, 0, 0, 0}, port: String.to_integer(System.get_env("EMBED_PORT") || "4001")] # ## SSL Support # diff --git a/lib/algora/application.ex b/lib/algora/application.ex index adc0cf0..419a394 100644 --- a/lib/algora/application.ex +++ b/lib/algora/application.ex @@ -20,6 +20,8 @@ defmodule Algora.Application do handle_new_client: &Algora.Pipeline.handle_new_client/3 } + :ok = :syn.add_node_to_scopes([:pipelines]) + children = [ Algora.Env, {Cluster.Supervisor, [topologies, [name: Algora.ClusterSupervisor]]}, diff --git a/lib/algora/pipeline.ex b/lib/algora/pipeline.ex index b70e875..371d511 100644 --- a/lib/algora/pipeline.ex +++ b/lib/algora/pipeline.ex @@ -3,7 +3,6 @@ defmodule Algora.Pipeline do require Membrane.Logger alias Membrane.Time - alias Membrane.RTMP.Messages alias Algora.{Admin, Library} alias Algora.Pipeline.HLS.LLController @@ -44,8 +43,8 @@ defmodule Algora.Pipeline do } {:ok, pid} = with true <- Algora.config([:resume_rtmp]), - [{pid, {:pipeline, video_uuid}}] <- Registry.lookup(Algora.Pipeline.Registry, stream_key) do - Algora.Pipeline.resume_rtmp(pid, %{params | video_uuid: video_uuid}) + {pid, metadata} when is_pid(pid) <- :syn.lookup(:pipelines, stream_key) do + Algora.Pipeline.resume_rtmp(pid, %{ params | video_uuid: metadata[:video_uuid] }) {:ok, pid} else _ -> @@ -73,7 +72,7 @@ defmodule Algora.Pipeline do video = Library.init_livestream!() dir = Path.join(Admin.tmp_dir(), video.uuid) - {:ok, _} = Registry.register(Algora.Pipeline.Registry, stream_key, {:pipeline, video.uuid}) + :ok = :syn.register(:pipelines, stream_key, self(), [video_uuid: video.uuid]) :rpc.multicall(LLController, :start, [video.uuid, dir]) {:ok, video} = diff --git a/mix.exs b/mix.exs index 5bb9ba2..dfde272 100644 --- a/mix.exs +++ b/mix.exs @@ -65,6 +65,7 @@ defmodule Algora.MixProject do {:membrane_funnel_plugin, "~> 0.9.1"}, {:membrane_framerate_converter_plugin, "~> 0.8.2"}, {:membrane_ffmpeg_swscale_plugin, "~> 0.15.1"}, + {:syn, "~> 3.3"}, {:mint, "~> 1.0"}, {:oban, "~> 2.16"}, {:open_api_spex, "~> 3.16"}, diff --git a/mix.lock b/mix.lock index e274af4..c036af5 100644 --- a/mix.lock +++ b/mix.lock @@ -139,6 +139,7 @@ "stream_split": {:hex, :stream_split, "0.1.7", "2d3fd1fd21697da7f91926768d65f79409086052c9ec7ae593987388f52425f8", [:mix], [], "hexpm", "1dc072ff507a64404a0ad7af90df97096183fee8eeac7b300320cea7c4679147"}, "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, "swoosh": {:hex, :swoosh, "1.8.2", "af9a22ab2c0d20b266f61acca737fa11a121902de9466a39e91bacdce012101c", [:mix], [{:cowboy, "~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:finch, "~> 0.6", [hex: :finch, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13 or ~> 1.0", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d058ba750eafadb6c09a84a352c14c5d1eeeda6e84945fcc95785b7f3067b7db"}, + "syn": {:hex, :syn, "3.3.0", "4684a909efdfea35ce75a9662fc523e4a8a4e8169a3df275e4de4fa63f99c486", [:rebar3], [], "hexpm", "e58ee447bc1094bdd21bf0acc102b1fbf99541a508cd48060bf783c245eaf7d6"}, "tailwind": {:hex, :tailwind, "0.2.2", "9e27288b568ede1d88517e8c61259bc214a12d7eed271e102db4c93fcca9b2cd", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "ccfb5025179ea307f7f899d1bb3905cd0ac9f687ed77feebc8f67bdca78565c4"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"},