diff --git a/config/dev.exs b/config/dev.exs index b916257..8c72697 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -2,7 +2,8 @@ import Config config :algora, mode: :dev, - resume_rtmp: System.get_env("RESUME_RTMP", "false") == "true" + resume_rtmp: true, + transcode: System.get_env("TRANSCODE") config :algora, :buckets, media: System.get_env("BUCKET_MEDIA"), diff --git a/config/runtime.exs b/config/runtime.exs index bf64882..b468daf 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -14,7 +14,8 @@ end config :algora, hf_token: System.get_env("HF_TOKEN"), - resume_rtmp: System.get_env("RESUME_RTMP", "false") == "true" + resume_rtmp: true, + transcode: System.get_env("TRANSCODE") config :replicate, replicate_api_token: System.get_env("REPLICATE_API_TOKEN") diff --git a/lib/algora/pipeline.ex b/lib/algora/pipeline.ex index ed95927..aa692d2 100644 --- a/lib/algora/pipeline.ex +++ b/lib/algora/pipeline.ex @@ -52,7 +52,12 @@ defmodule Algora.Pipeline do {:ok, pid} end - Algora.Pipeline.ClientHandler + if Algora.config([:transcode]) do + # will send SetDataFrame message to pid + {Algora.Pipeline.ClientHandler, %{}, pid} + else + {Algora.Pipeline.ClientHandler, %{}} + end end def resume_rtmp(pipeline, params) when is_pid(pipeline) do @@ -124,7 +129,7 @@ defmodule Algora.Pipeline do }), ] - send(self(), :link_tracks) + unless Algora.config([:transcode]), do: send(self(), :link_tracks) {[spec: spec], state} else @@ -214,21 +219,54 @@ defmodule Algora.Pipeline do } end - def handle_info(:link_tracks, _ctx, state) do - spec = [ - # - get_child(:tee_video) - |> via_in(Pad.ref(:input, "video_master"), - options: [ - track_name: "video_master", - encoding: :H264, - segment_duration: @segment_duration, - partial_segment_duration: @partial_segment_duration - ] - ) - |> get_child(:sink), + def handle_info(:link_tracks, _ctx, %{reconnect: reconnect} = state) do + structure = if transcode = transcode_formats(state.data_frame) do + %{ framerate: source_framerate } = state.data_frame + Enum.map(transcode, fn(%{ framerate: framerate, height: height, width: width, track_name: track_name }) -> + # + get_child(:tee_video) + |> child(%Membrane.H264.Parser{ + output_stream_structure: :annexb, + generate_best_effort_timestamps: %{framerate: {trunc(source_framerate), @frame_devisor}} + }) + |> child(Membrane.H264.FFmpeg.Decoder) + |> child(%Membrane.FramerateConverter{framerate: {trunc(framerate), @frame_devisor}}) + |> child(%Membrane.FFmpeg.SWScale.Scaler{ + output_height: height, + output_width: width + }) + |> child(%Membrane.H264.FFmpeg.Encoder{ + preset: :ultrafast, + gop_size: trunc(framerate * 2), + }) + |> child({:video_reconnect, "#{track_name}-#{reconnect}"}, Membrane.Tee.Parallel) + |> via_in(Pad.ref(:input, track_name), + options: [ + track_name: track_name, + encoding: :H264, + segment_duration: @segment_duration, + partial_segment_duration: @partial_segment_duration + ] + ) + |> get_child(:sink) + end) + else + [ + # + get_child(:tee_video) + |> via_in(Pad.ref(:input, "video_master"), + options: [ + track_name: "video_master", + encoding: :H264, + segment_duration: @segment_duration, + partial_segment_duration: @partial_segment_duration + ] + ) + |> get_child(:sink) + ] + end - # + structure = [ get_child(:tee_audio) |> via_in(Pad.ref(:input, "audio_master"), options: [ @@ -239,8 +277,9 @@ defmodule Algora.Pipeline do ] ) |> get_child(:sink) - ] + ] ++ structure + spec = {structure, group: :hls_adaptive} {[spec: spec], state} end @@ -249,8 +288,15 @@ defmodule Algora.Pipeline do remove_link: {:funnel_video, Pad.ref(:input, reconnect)}, remove_link: {:funnel_audio, Pad.ref(:input, reconnect)}, remove_link: {:sink, Pad.ref(:input, "audio_master")}, - remove_link: {:sink, Pad.ref(:input, "video_master")}, - ] + ] ++ if Algora.config([:transcode]) do + Enum.map(transcode_formats(state.data_frame), fn(%{track_name: track_name}) -> + {:remove_link, {:sink, Pad.ref(:input, track_name)}} + end) + else + [ + remove_link: {:sink, Pad.ref(:input, "video_master")}, + ] + end {actions, state} end @@ -313,6 +359,15 @@ defmodule Algora.Pipeline do def handle_info(:reconnect_inactivity, _ctx, state), do: {[], state} + def handle_info(%Messages.SetDataFrame{} = message, _ctx, %{data_frame: nil} = state) do + if Algora.config([:transcode]), do: send(self(), :link_tracks) + {[], %{ state | data_frame: message }} + end + + def handle_info(%Messages.SetDataFrame{} = _message, _ctx, state) do + {[], state} + end + def handle_info(:terminate, _ctx, state) do Algora.Library.toggle_streamer_live(state.video, false) Membrane.Pipeline.terminate(self(), asynchronous?: true) @@ -375,4 +430,32 @@ defmodule Algora.Pipeline do :timer.cancel(timer) %{ state | terminate_timer: nil } end + + defp normalize_scale(scale) when is_float(scale), do: + scale |> trunc() |> normalize_scale() + defp normalize_scale(scale) when is_integer(scale) and scale > 0 do + if rem(scale, 2) == 1, do: scale - 1, else: scale + end + + defp transcode_formats(nil), do: nil + defp transcode_formats(%{height: source_height, width: source_width, framerate: source_framerate}) do + if transcode_slug = Algora.config([:transcode]) do + transcode = transcode_slug + |> String.split("|") + |> Enum.map(&String.split(&1, "p")) + |> Enum.map(fn([h, f]) -> {String.to_integer(h), String.to_integer(f)} end) + |> Enum.filter(fn({target_height, framerate}) -> + target_height <= source_height and framerate <= source_framerate + end) + + Enum.map(transcode, fn({target_height, target_framerate}) -> + height = normalize_scale(target_height) + width = normalize_scale(source_width / (source_height / target_height)) + framerate = trunc(target_framerate) + track_name = "video_#{width}x#{height}p#{framerate}" + %{ height: height, width: width, framerate: framerate, track_name: track_name } + end) + end + end + end