mirror of
https://github.com/algora-io/tv.git
synced 2024-11-16 00:58:59 +02:00
improve restream websocket reliability with dynamic token refresh (#68)
* try to reconnect using fresh token * add handlers to start, restart and terminate
This commit is contained in:
parent
0ed72eda3d
commit
a235c36888
@ -132,7 +132,13 @@ defmodule Algora.Pipeline do
|
||||
if url = Algora.Accounts.get_restream_ws_url(user) do
|
||||
Task.Supervisor.start_child(
|
||||
Algora.TaskSupervisor,
|
||||
fn -> Algora.Restream.Websocket.start_link(%{url: url, video: state.video}) end,
|
||||
fn ->
|
||||
Algora.Restream.Websocket.start_link(%{
|
||||
url: url,
|
||||
user: user,
|
||||
video: state.video
|
||||
})
|
||||
end,
|
||||
restart: :transient
|
||||
)
|
||||
end
|
||||
|
@ -27,7 +27,7 @@ defimpl Membrane.RTMP.MessageValidator, for: Algora.Pipeline.MessageValidator do
|
||||
if url = Algora.Accounts.get_restream_ws_url(user) do
|
||||
Task.Supervisor.start_child(
|
||||
Algora.TaskSupervisor,
|
||||
fn -> Algora.Restream.Websocket.start_link(%{url: url, video: video}) end,
|
||||
fn -> Algora.Restream.Websocket.start_link(%{url: url, user: user, video: video}) end,
|
||||
restart: :transient
|
||||
)
|
||||
end
|
||||
|
@ -2,21 +2,38 @@ defmodule Algora.Restream.Websocket do
|
||||
use WebSockex
|
||||
require Logger
|
||||
|
||||
alias Algora.{Accounts, Chat}
|
||||
alias Algora.{Accounts, Chat, Library}
|
||||
|
||||
def start_link(%{url: url, video: video} = opts) do
|
||||
retry = opts[:retry] || 0
|
||||
def start_link(%{url: url, video: video, user: user} = opts) do
|
||||
restart = opts[:restart] || 0
|
||||
Logger.info("Starting WebSocket: #{video.id} (restart: #{restart}, retry: #{retry})")
|
||||
|
||||
Logger.info("Starting WebSocket: #{video.id} (restart: #{restart})")
|
||||
|
||||
WebSockex.start_link(url, __MODULE__, %{
|
||||
url: url,
|
||||
video: video,
|
||||
user: user,
|
||||
restart: restart,
|
||||
retry: retry
|
||||
terminated: false
|
||||
})
|
||||
end
|
||||
|
||||
def start_link(video_id) do
|
||||
video = Library.get_video!(video_id)
|
||||
user = Accounts.get_user!(video.user_id)
|
||||
url = Accounts.get_restream_ws_url(user)
|
||||
|
||||
start_link(%{url: url, video: video, user: user})
|
||||
end
|
||||
|
||||
def handle_info(:restart, state) do
|
||||
{:close, state}
|
||||
end
|
||||
|
||||
def handle_info(:terminate, state) do
|
||||
{:close, %{state | terminated: true}}
|
||||
end
|
||||
|
||||
def handle_frame({:text, msg}, state) do
|
||||
case Jason.decode(msg) do
|
||||
{:ok, %{"action" => "event", "payload" => payload}} ->
|
||||
@ -33,40 +50,40 @@ defmodule Algora.Restream.Websocket do
|
||||
end
|
||||
|
||||
def handle_connect(_conn, state) do
|
||||
if state.restart == 0 and state.retry == 0 do
|
||||
Logger.info("WebSocket connected: #{state.video.id}")
|
||||
if state.restart == 0 do
|
||||
Logger.info("WebSocket connected: #{state.video.id} (pid: #{:erlang.pid_to_list(self())})")
|
||||
else
|
||||
Logger.info(
|
||||
"WebSocket reconnected: #{state.video.id} (restart: #{state.restart}, retry: #{state.retry})"
|
||||
"WebSocket reconnected: #{state.video.id} (pid: #{:erlang.pid_to_list(self())}, restart: #{state.restart})"
|
||||
)
|
||||
end
|
||||
|
||||
{:ok, state}
|
||||
{:ok, %{state | restart: 0}}
|
||||
end
|
||||
|
||||
def handle_disconnect(reason, state) do
|
||||
Logger.error("WebSocket disconnected: #{state.video.id} (reason: #{inspect(reason)})")
|
||||
|
||||
# latest_video = Library.get_latest_video(Accounts.get_user!(state.video.user_id))
|
||||
# socket_video = Library.get_video!(state.video.id)
|
||||
# if socket_video.id == latest_video.id and socket_video.is_live and state.retry < 10 do
|
||||
|
||||
if state.retry < 3 do
|
||||
:timer.sleep(:timer.seconds(min(2 ** state.retry, 60)))
|
||||
state = %{state | retry: state.retry + 1}
|
||||
{:reconnect, state}
|
||||
if state.terminated do
|
||||
Logger.info("Terminating WebSocket: #{state.video.id}")
|
||||
else
|
||||
Logger.info("Restarting WebSocket: #{state.video.id}")
|
||||
:timer.sleep(:timer.seconds(min(2 ** state.restart, 60)))
|
||||
state = %{state | restart: state.restart + 1, retry: 0}
|
||||
|
||||
Task.Supervisor.start_child(
|
||||
Algora.TaskSupervisor,
|
||||
fn -> Algora.Restream.Websocket.start_link(state) end,
|
||||
restart: :transient
|
||||
)
|
||||
|
||||
{:ok, state}
|
||||
restart_socket(state)
|
||||
end
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
defp restart_socket(state) do
|
||||
url = Accounts.get_restream_ws_url(state.user)
|
||||
state = %{state | restart: state.restart + 1, url: url || state.url}
|
||||
|
||||
Task.Supervisor.start_child(
|
||||
Algora.TaskSupervisor,
|
||||
fn -> Algora.Restream.Websocket.start_link(state) end,
|
||||
restart: :transient
|
||||
)
|
||||
end
|
||||
|
||||
defp handle_payload(%{"eventPayload" => %{"contentModifiers" => %{"whisper" => true}}}, state) do
|
||||
|
Loading…
Reference in New Issue
Block a user