You've already forked algora-tv
							
							
				mirror of
				https://github.com/algora-io/tv.git
				synced 2025-10-30 23:07:56 +02:00 
			
		
		
		
	dbg
This commit is contained in:
		| @@ -117,6 +117,7 @@ defmodule Algora.Demuxer do | ||||
|     case Parser.parse_body(body) do | ||||
|       {:ok, packets, rest} -> | ||||
|         {actions, state} = get_actions(packets, state) | ||||
|         # dbg(actions) | ||||
|         {actions, %{state | partial: rest}} | ||||
|  | ||||
|       {:error, :not_enough_data} -> | ||||
|   | ||||
| @@ -8,7 +8,7 @@ defmodule Algora.Pipeline do | ||||
|  | ||||
|     spec = [ | ||||
|       # | ||||
|       child(:src, %Algora.SourceBin{ | ||||
|       child(:src, %Membrane.RTMP.SourceBin{ | ||||
|         socket: socket, | ||||
|         validator: %Algora.MessageValidator{video_id: video.id, pid: self()} | ||||
|       }), | ||||
| @@ -26,12 +26,12 @@ defmodule Algora.Pipeline do | ||||
|       # | ||||
|       get_child(:src) | ||||
|       |> via_out(:audio) | ||||
|       |> child(:tee_audio, Membrane.Tee.Master), | ||||
|       |> child(:tee_audio, Algora.Tee), | ||||
|  | ||||
|       # | ||||
|       get_child(:src) | ||||
|       |> via_out(:video) | ||||
|       |> child(:tee_video, Membrane.Tee.Master), | ||||
|       |> child(:tee_video, Algora.Tee), | ||||
|  | ||||
|       # | ||||
|       get_child(:tee_audio) | ||||
| @@ -50,7 +50,7 @@ defmodule Algora.Pipeline do | ||||
|       |> get_child(:sink) | ||||
|     ] | ||||
|  | ||||
|     {[spec: spec], %{socket: socket, video: video}} | ||||
|     {[spec: spec], %{socket: socket, video: video, native: nil}} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
| @@ -98,46 +98,63 @@ defmodule Algora.Pipeline do | ||||
|   def handle_info({:forward_rtmp, url, ref}, _ctx, state) do | ||||
|     spec = [ | ||||
|       # | ||||
|       child(ref, %Membrane.RTMP.Sink{rtmp_url: url}), | ||||
|       child(ref, %Algora.Sink{ | ||||
|         rtmp_url: url, | ||||
|         pid: self(), | ||||
|         native: state.native | ||||
|       }), | ||||
|  | ||||
|       # | ||||
|       get_child(:tee_audio) | ||||
|       |> via_out(:copy) | ||||
|       |> via_in(Pad.ref(:audio, 0)) | ||||
|       |> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000) | ||||
|       # |> via_in(Pad.ref(:audio, 0)) | ||||
|       |> get_child(ref), | ||||
|  | ||||
|       # | ||||
|       get_child(:tee_video) | ||||
|       |> via_out(:copy) | ||||
|       |> via_in(Pad.ref(:video, 0)) | ||||
|       |> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000) | ||||
|       # |> via_in(Pad.ref(:video, 0)) | ||||
|       |> get_child(ref) | ||||
|     ] | ||||
|  | ||||
|     {[spec: spec], state} | ||||
|   end | ||||
|  | ||||
|   def handle_info({:init, native}, _ctx, state) do | ||||
|     {[], %{state | native: native}} | ||||
|   end | ||||
|  | ||||
|   def handle_info(:multicast_algora, _ctx, state) do | ||||
|     user = Algora.Accounts.get_user_by!(handle: "algora") | ||||
|     destinations = Algora.Accounts.list_active_destinations(user.id) | ||||
|     send( | ||||
|       self(), | ||||
|       {:forward_rtmp, "rtmp://localhost:9006/3wactacTCNZIiUHa2EGSDnxvzBZHcFrh5IQ-czPZFXo", | ||||
|        String.to_atom("rtmp_sink_algora_0")} | ||||
|     ) | ||||
|  | ||||
|     for {destination, i} <- Enum.with_index(destinations) do | ||||
|       url = | ||||
|         URI.new!(destination.rtmp_url) | ||||
|         |> URI.append_path("/" <> destination.stream_key) | ||||
|         |> URI.to_string() | ||||
|     # user = Algora.Accounts.get_user_by!(handle: "algora") | ||||
|     # destinations = Algora.Accounts.list_active_destinations(user.id) | ||||
|  | ||||
|       send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_algora_#{i}")}) | ||||
|     end | ||||
|     # for {destination, i} <- Enum.with_index(destinations) do | ||||
|     #   url = | ||||
|     #     URI.new!(destination.rtmp_url) | ||||
|     #     |> URI.append_path("/" <> destination.stream_key) | ||||
|     #     |> URI.to_string() | ||||
|     # | ||||
|     # send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_algora_#{i}")}) | ||||
|     # end | ||||
|  | ||||
|     if url = Algora.Accounts.get_restream_ws_url(user) do | ||||
|       Task.Supervisor.start_child( | ||||
|         Algora.TaskSupervisor, | ||||
|         fn -> Algora.Restream.Websocket.start_link(%{url: url, video: state.video}) end, | ||||
|         restart: :transient | ||||
|       ) | ||||
|     end | ||||
|     # if url = Algora.Accounts.get_restream_ws_url(user) do | ||||
|     #   Task.Supervisor.start_child( | ||||
|     #     Algora.TaskSupervisor, | ||||
|     #     fn -> Algora.Restream.Websocket.start_link(%{url: url, video: state.video}) end, | ||||
|     #     restart: :transient | ||||
|     #   ) | ||||
|     # end | ||||
|  | ||||
|     {[], state} | ||||
|     # {[stream_format: {pad, %AAC{config: {:audio_specific_config, packet.payload}}}], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   | ||||
							
								
								
									
										385
									
								
								lib/algora/sink.ex
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										385
									
								
								lib/algora/sink.ex
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,385 @@ | ||||
| defmodule Algora.Sink do | ||||
|   @moduledoc """ | ||||
|   Membrane element being client-side of RTMP streams. | ||||
|   It needs to receive at least one of: video stream in H264 format or audio in AAC format. | ||||
|   Currently it supports only: | ||||
|     - RTMP proper - "plain" RTMP protocol | ||||
|     - RTMPS - RTMP over TLS/SSL | ||||
|   other RTMP variants - RTMPT, RTMPE, RTMFP are not supported. | ||||
|   Implementation based on FFmpeg. | ||||
|   """ | ||||
|  | ||||
|   # Buffer `handle_stream_format` and `handle_buffer` actions | ||||
|   # until connection has been established | ||||
|  | ||||
|   use Membrane.Sink | ||||
|  | ||||
|   require Membrane.{H264, Logger} | ||||
|  | ||||
|   alias Membrane.RTMP.Sink.Native | ||||
|   alias Membrane.{AAC, Buffer, H264} | ||||
|  | ||||
|   @supported_protocols ["rtmp://", "rtmps://"] | ||||
|   @connection_attempt_interval 500 | ||||
|   @type track_type :: :audio | :video | ||||
|  | ||||
|   def_input_pad(:audio, | ||||
|     availability: :on_request, | ||||
|     accepted_format: AAC, | ||||
|     flow_control: :manual, | ||||
|     demand_unit: :buffers | ||||
|   ) | ||||
|  | ||||
|   def_input_pad(:video, | ||||
|     availability: :on_request, | ||||
|     accepted_format: %H264{stream_structure: structure} when H264.is_avc(structure), | ||||
|     flow_control: :manual, | ||||
|     demand_unit: :buffers | ||||
|   ) | ||||
|  | ||||
|   def_options( | ||||
|     rtmp_url: [ | ||||
|       spec: String.t(), | ||||
|       description: """ | ||||
|       Destination URL of the stream. It needs to start with rtmp:// or rtmps:// depending on the protocol variant. | ||||
|       This URL should be provided by your streaming service. | ||||
|       """ | ||||
|     ], | ||||
|     max_attempts: [ | ||||
|       spec: pos_integer() | :infinity, | ||||
|       default: 1, | ||||
|       description: """ | ||||
|       Maximum number of connection attempts before failing with an error. | ||||
|       The attempts will happen every #{@connection_attempt_interval} ms | ||||
|       """ | ||||
|     ], | ||||
|     tracks: [ | ||||
|       spec: [track_type()], | ||||
|       default: [:audio, :video], | ||||
|       description: """ | ||||
|       A list of tracks, which will be sent. Can be `:audio`, `:video` or both. | ||||
|       """ | ||||
|     ], | ||||
|     pid: [spec: pid()], | ||||
|     native: [spec: any(), optional: true] | ||||
|   ) | ||||
|  | ||||
|   @impl true | ||||
|   def handle_init(_ctx, options) do | ||||
|     dbg({:handle_init, options}) | ||||
|  | ||||
|     unless String.starts_with?(options.rtmp_url, @supported_protocols) do | ||||
|       raise ArgumentError, "Invalid destination URL provided" | ||||
|     end | ||||
|  | ||||
|     unless options.max_attempts == :infinity or | ||||
|              (is_integer(options.max_attempts) and options.max_attempts >= 1) do | ||||
|       raise ArgumentError, "Invalid max_attempts option value: #{options.max_attempts}" | ||||
|     end | ||||
|  | ||||
|     options = %{options | tracks: Enum.uniq(options.tracks)} | ||||
|  | ||||
|     unless length(options.tracks) > 0 and | ||||
|              Enum.all?(options.tracks, &Kernel.in(&1, [:audio, :video])) do | ||||
|       raise ArgumentError, "All track have to be either :audio or :video" | ||||
|     end | ||||
|  | ||||
|     single_track? = true | ||||
|     # single_track? = length(options.tracks) == 1 | ||||
|     frame_buffer = Enum.map(options.tracks, &{Pad.ref(&1, 0), nil}) |> Enum.into(%{}) | ||||
|  | ||||
|     dbg(single_track?: single_track?) | ||||
|     dbg(frame_buffer: frame_buffer) | ||||
|  | ||||
|     state = | ||||
|       options | ||||
|       |> Map.from_struct() | ||||
|       |> Map.merge(%{ | ||||
|         attempts: 0, | ||||
|         native: nil, | ||||
|         # Keys here are the pad names. | ||||
|         frame_buffer: frame_buffer, | ||||
|         ready?: false, | ||||
|         # Activated when one of the source inputs gets closed. Interleaving is | ||||
|         # disabled, frame buffer is flushed and from that point buffers on the | ||||
|         # remaining pad are simply forwarded to the output. | ||||
|         # Always on if a single track is connected | ||||
|         forward_mode?: single_track?, | ||||
|         video_base_dts: nil, | ||||
|         count: 0, | ||||
|         skip_until_keyframe: true | ||||
|       }) | ||||
|  | ||||
|     {[], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_setup(_ctx, state) do | ||||
|     dbg({:handle_setup}) | ||||
|     audio? = :audio in state.tracks | ||||
|     video? = :video in state.tracks | ||||
|  | ||||
|     {:ok, native} = Native.create(state.rtmp_url, audio?, video?) | ||||
|  | ||||
|     state | ||||
|     |> Map.put(:native, native) | ||||
|     |> try_connect() | ||||
|     |> then(&{[], &1}) | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_playing(_ctx, state) do | ||||
|     dbg({:handle_playing}) | ||||
|     {build_demand(state), state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_pad_added(Pad.ref(_type, stream_id), _ctx, _state) when stream_id != 0, | ||||
|     do: raise(ArgumentError, message: "Stream id must always be 0") | ||||
|  | ||||
|   @impl true | ||||
|   def handle_pad_added(_pad, _ctx, state) do | ||||
|     {[], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_stream_format( | ||||
|         Pad.ref(:video, 0), | ||||
|         %H264{width: width, height: height, stream_structure: {_avc, dcr}} = format, | ||||
|         _ctx, | ||||
|         state | ||||
|       ) do | ||||
|     dbg({:handle_stream_format, format}) | ||||
|  | ||||
|     case Native.init_video_stream(state.native, width, height, dcr) do | ||||
|       {:ok, ready?, native} -> | ||||
|         Membrane.Logger.debug("Correctly initialized video stream.") | ||||
|         send(state.pid, {:init, native}) | ||||
|         {[], %{state | native: native, ready?: ready?}} | ||||
|  | ||||
|       {:error, :stream_format_resent} -> | ||||
|         Membrane.Logger.warning( | ||||
|           "Input stream format redefined on pad :video. RTMP Sink does not support dynamic stream parameters" | ||||
|         ) | ||||
|  | ||||
|         {[], state} | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_stream_format(Pad.ref(:audio, 0), %Membrane.AAC{} = stream_format, _ctx, state) do | ||||
|     profile = AAC.profile_to_aot_id(stream_format.profile) | ||||
|     sr_index = AAC.sample_rate_to_sampling_frequency_id(stream_format.sample_rate) | ||||
|     channel_configuration = AAC.channels_to_channel_config_id(stream_format.channels) | ||||
|     frame_length_id = AAC.samples_per_frame_to_frame_length_id(stream_format.samples_per_frame) | ||||
|  | ||||
|     aac_config = | ||||
|       <<profile::5, sr_index::4, channel_configuration::4, frame_length_id::1, 0::1, 0::1>> | ||||
|  | ||||
|     case Native.init_audio_stream( | ||||
|            state.native, | ||||
|            stream_format.channels, | ||||
|            stream_format.sample_rate, | ||||
|            aac_config | ||||
|          ) do | ||||
|       {:ok, ready?, native} -> | ||||
|         Membrane.Logger.debug("Correctly initialized audio stream.") | ||||
|         {[], %{state | native: native, ready?: ready?}} | ||||
|  | ||||
|       {:error, :stream_format_resent} -> | ||||
|         Membrane.Logger.warning( | ||||
|           "Input stream format redefined on pad :audio. RTMP Sink does not support dynamic stream parameters" | ||||
|         ) | ||||
|  | ||||
|         {[], state} | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   # def handle_buffer(Pad.ref(:video, 0) = pad, buffer, ctx, %{skip_until_keyframe: true} = state) do | ||||
|   #   if buffer.metadata.h264.key_frame? do | ||||
|   #     dbg("keyframe detected") | ||||
|   #     handle_buffer(pad, buffer, ctx, %{state | skip_until_keyframe: false}) | ||||
|   #   else | ||||
|   #     dbg("skipping non keyframe") | ||||
|   #     {[demand: pad], state} | ||||
|   #   end | ||||
|   # end | ||||
|  | ||||
|   # def handle_buffer(Pad.ref(:audio, 0) = pad, _buffer, _ctx, %{skip_until_keyframe: true} = state) do | ||||
|   #   dbg("skipping non keyframe") | ||||
|   #   {[demand: pad], state} | ||||
|   # end | ||||
|  | ||||
|   def handle_buffer(pad, buffer, _ctx, %{ready?: false} = state) do | ||||
|     # state = | ||||
|     #   if state.count < 10 do | ||||
|     #     dbg({:handle_buffer, buffer}) | ||||
|     #     %{state | count: state.count + 1} | ||||
|     #   else | ||||
|     #     state | ||||
|     #   end | ||||
|  | ||||
|     {[], fill_frame_buffer(state, pad, buffer)} | ||||
|   end | ||||
|  | ||||
|   def handle_buffer(pad, buffer, _ctx, %{forward_mode?: true} = state) do | ||||
|     # state = | ||||
|     #   if state.count < 10 do | ||||
|     #     dbg({:handle_buffer, buffer}) | ||||
|     #     %{state | count: state.count + 1} | ||||
|     #   else | ||||
|     #     state | ||||
|     #   end | ||||
|  | ||||
|     {[demand: pad], write_frame(state, pad, buffer)} | ||||
|   end | ||||
|  | ||||
|   def handle_buffer(pad, buffer, _ctx, state) do | ||||
|     state | ||||
|     |> fill_frame_buffer(pad, buffer) | ||||
|     |> write_frame_interleaved() | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_start_of_stream(pad, _ctx, state) do | ||||
|     dbg({:handle_start_of_stream, pad}) | ||||
|     {[], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_end_of_stream(Pad.ref(type, 0), _ctx, state) do | ||||
|     if state.forward_mode? do | ||||
|       Native.finalize_stream(state.native) | ||||
|       {[], state} | ||||
|     else | ||||
|       # The interleave logic does not work if either one of the inputs does not | ||||
|       # produce buffers. From this point on we act as a "forward" filter. | ||||
|       other_pad = | ||||
|         case type do | ||||
|           :audio -> :video | ||||
|           :video -> :audio | ||||
|         end | ||||
|         |> then(&Pad.ref(&1, 0)) | ||||
|  | ||||
|       state = flush_frame_buffer(state) | ||||
|       {[demand: other_pad], %{state | forward_mode?: true}} | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   defp try_connect(%{attempts: attempts, max_attempts: max_attempts} = state) | ||||
|        when max_attempts != :infinity and attempts >= max_attempts do | ||||
|     raise "failed to connect to '#{state.rtmp_url}' #{attempts} times, aborting" | ||||
|   end | ||||
|  | ||||
|   defp try_connect(state) do | ||||
|     dbg({:try_connect}) | ||||
|     state = %{state | attempts: state.attempts + 1} | ||||
|  | ||||
|     case Native.try_connect(state.native) do | ||||
|       :ok -> | ||||
|         Membrane.Logger.debug("Correctly initialized connection with: #{state.rtmp_url}") | ||||
|  | ||||
|         state | ||||
|  | ||||
|       {:error, error} when error in [:econnrefused, :etimedout] -> | ||||
|         Membrane.Logger.warning( | ||||
|           "Connection to #{state.rtmp_url} refused, retrying in #{@connection_attempt_interval}ms" | ||||
|         ) | ||||
|  | ||||
|         Process.sleep(@connection_attempt_interval) | ||||
|  | ||||
|         try_connect(state) | ||||
|  | ||||
|       {:error, reason} -> | ||||
|         raise "failed to connect to '#{state.rtmp_url}': #{inspect(reason)}" | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   defp build_demand(%{frame_buffer: frame_buffer}) do | ||||
|     frame_buffer | ||||
|     |> Enum.filter(fn {_pad, buffer} -> buffer == nil end) | ||||
|     |> Enum.map(fn {pad, _buffer} -> {:demand, pad} end) | ||||
|   end | ||||
|  | ||||
|   defp fill_frame_buffer(state, pad, buffer) do | ||||
|     if get_in(state, [:frame_buffer, pad]) == nil do | ||||
|       put_in(state, [:frame_buffer, pad], buffer) | ||||
|     else | ||||
|       raise "attempted to overwrite frame buffer on pad #{inspect(pad)}" | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   defp write_frame_interleaved( | ||||
|          %{ | ||||
|            frame_buffer: %{Pad.ref(:audio, 0) => audio, Pad.ref(:video, 0) => video} | ||||
|          } = state | ||||
|        ) | ||||
|        when audio == nil or video == nil do | ||||
|     # We still have to wait for the other frame. | ||||
|     {[], state} | ||||
|   end | ||||
|  | ||||
|   defp write_frame_interleaved(%{frame_buffer: frame_buffer} = state) do | ||||
|     {pad, buffer} = | ||||
|       Enum.min_by(frame_buffer, fn {_pad, buffer} -> | ||||
|         buffer | ||||
|         |> Buffer.get_dts_or_pts() | ||||
|         |> Ratio.ceil() | ||||
|       end) | ||||
|  | ||||
|     state = | ||||
|       state | ||||
|       |> write_frame(pad, buffer) | ||||
|       |> put_in([:frame_buffer, pad], nil) | ||||
|  | ||||
|     {build_demand(state), state} | ||||
|   end | ||||
|  | ||||
|   defp flush_frame_buffer(%{frame_buffer: frame_buffer} = state) do | ||||
|     pads_with_buffer = | ||||
|       frame_buffer | ||||
|       |> Enum.filter(fn {_pad, buffer} -> buffer != nil end) | ||||
|       |> Enum.sort(fn {_, left}, {_, right} -> | ||||
|         Buffer.get_dts_or_pts(left) <= Buffer.get_dts_or_pts(right) | ||||
|       end) | ||||
|  | ||||
|     Enum.reduce(pads_with_buffer, state, fn {pad, buffer}, state -> | ||||
|       state | ||||
|       |> write_frame(pad, buffer) | ||||
|       |> put_in([:frame_buffer, pad], nil) | ||||
|     end) | ||||
|   end | ||||
|  | ||||
|   defp write_frame(state, Pad.ref(:audio, 0), buffer) do | ||||
|     buffer_pts = Ratio.ceil(buffer.pts) | ||||
|  | ||||
|     case Native.write_audio_frame(state.native, buffer.payload, buffer_pts) do | ||||
|       {:ok, native} -> | ||||
|         Map.put(state, :native, native) | ||||
|  | ||||
|       {:error, reason} -> | ||||
|         raise "writing audio frame failed with reason: #{inspect(reason)}" | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   defp write_frame(state, Pad.ref(:video, 0), buffer) do | ||||
|     dts = buffer.dts || buffer.pts | ||||
|     pts = buffer.pts || buffer.dts | ||||
|     {base_dts, state} = Bunch.Map.get_updated!(state, :video_base_dts, &(&1 || dts)) | ||||
|  | ||||
|     case Native.write_video_frame( | ||||
|            state.native, | ||||
|            buffer.payload, | ||||
|            dts - base_dts, | ||||
|            pts - base_dts, | ||||
|            buffer.metadata.h264.key_frame? | ||||
|          ) do | ||||
|       {:ok, native} -> | ||||
|         Map.put(state, :native, native) | ||||
|  | ||||
|       {:error, reason} -> | ||||
|         raise "writing video frame failed with reason: #{inspect(reason)}" | ||||
|     end | ||||
|   end | ||||
| end | ||||
							
								
								
									
										75
									
								
								lib/algora/tee.ex
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								lib/algora/tee.ex
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,75 @@ | ||||
| defmodule Algora.Tee do | ||||
|   @moduledoc """ | ||||
|   Element for forwarding buffers to at least one output pad | ||||
|  | ||||
|   It has one input pad `:input` and 2 output pads: | ||||
|   * `:master` - is a static pad which is always available and works in pull mode | ||||
|   * `:copy` - is a dynamic pad that can be linked to any number of elements (including 0) and works in push mode | ||||
|  | ||||
|   The `:master` pad dictates the speed of processing data and any element (or elements) connected to `:copy` pad | ||||
|   will receive the same data as `:master` | ||||
|   """ | ||||
|   use Membrane.Filter | ||||
|  | ||||
|   def_input_pad(:input, | ||||
|     availability: :always, | ||||
|     flow_control: :auto, | ||||
|     accepted_format: _any | ||||
|   ) | ||||
|  | ||||
|   def_output_pad(:master, | ||||
|     availability: :always, | ||||
|     flow_control: :auto, | ||||
|     accepted_format: _any | ||||
|   ) | ||||
|  | ||||
|   def_output_pad(:copy, | ||||
|     availability: :on_request, | ||||
|     flow_control: :push, | ||||
|     accepted_format: _any | ||||
|   ) | ||||
|  | ||||
|   @impl true | ||||
|   def handle_init(_ctx, _opts) do | ||||
|     {[], %{accepted_format: nil, count: 0}} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_stream_format(_pad, accepted_format, _ctx, state) do | ||||
|     {[forward: accepted_format], %{state | accepted_format: accepted_format}} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_pad_added(Pad.ref(:copy, _ref), _ctx, %{accepted_format: nil} = state) do | ||||
|     {[], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_pad_added( | ||||
|         Pad.ref(:copy, _ref) = pad, | ||||
|         _ctx, | ||||
|         %{accepted_format: accepted_format} = state | ||||
|       ) do | ||||
|     dbg({:tee, :actions, [stream_format: {pad, accepted_format}]}) | ||||
|     {[stream_format: {pad, accepted_format}], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do | ||||
|     # state = | ||||
|     #   if state.count < 10 do | ||||
|     #     dbg(buffer) | ||||
|     #     %{state | count: state.count + 1} | ||||
|     #   else | ||||
|     #     state | ||||
|     #   end | ||||
|  | ||||
|     {[forward: buffer], state} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def handle_start_of_stream(pad, _ctx, state) do | ||||
|     dbg({:tee, :handle_start_of_stream, pad}) | ||||
|     {[], state} | ||||
|   end | ||||
| end | ||||
| @@ -5,6 +5,8 @@ end | ||||
| defimpl Membrane.RTMP.MessageValidator, for: Algora.MessageValidator do | ||||
|   @impl true | ||||
|   def validate_connect(impl, message) do | ||||
|     dbg({:validate_connect, {impl, message}}) | ||||
|  | ||||
|     {:ok, video} = | ||||
|       Algora.Library.reconcile_livestream( | ||||
|         %Algora.Library.Video{id: impl.video_id}, | ||||
| @@ -22,6 +24,8 @@ defimpl Membrane.RTMP.MessageValidator, for: Algora.MessageValidator do | ||||
|       send(impl.pid, {:forward_rtmp, url, String.to_atom("rtmp_sink_#{i}")}) | ||||
|     end | ||||
|  | ||||
|     # send(impl.pid, {:forward_rtmp, nil, String.to_atom("rtmp_sink_algora")}) | ||||
|  | ||||
|     user = Algora.Accounts.get_user!(video.user_id) | ||||
|  | ||||
|     if url = Algora.Accounts.get_restream_ws_url(user) do | ||||
| @@ -36,17 +40,20 @@ defimpl Membrane.RTMP.MessageValidator, for: Algora.MessageValidator do | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def validate_release_stream(_impl, _message) do | ||||
|   def validate_release_stream(impl, message) do | ||||
|     dbg({:validate_release_stream, {impl, message}}) | ||||
|     {:ok, "release stream success"} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def validate_publish(_impl, _message) do | ||||
|   def validate_publish(impl, message) do | ||||
|     dbg({:validate_publish, {impl, message}}) | ||||
|     {:ok, "validate publish success"} | ||||
|   end | ||||
|  | ||||
|   @impl true | ||||
|   def validate_set_data_frame(_impl, _message) do | ||||
|   def validate_set_data_frame(impl, message) do | ||||
|     dbg({:validate_set_data_frame, {impl, message}}) | ||||
|     {:ok, "set data frame success"} | ||||
|   end | ||||
| end | ||||
|   | ||||
		Reference in New Issue
	
	Block a user