1
0
mirror of https://github.com/algora-io/tv.git synced 2025-02-14 01:59:50 +02:00

relay chat messages from other platforms (#38)

This commit is contained in:
Zafer Cesur 2024-05-23 14:03:57 +03:00 committed by GitHub
parent d7c75f36c5
commit c83a45add4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 311 additions and 25 deletions

View File

@ -3,7 +3,7 @@ defmodule Algora.Accounts do
import Ecto.Changeset
alias Algora.{Repo, Restream}
alias Algora.Accounts.{User, Identity, Destination}
alias Algora.Accounts.{User, Identity, Destination, Entity}
def list_users(opts) do
Repo.all(from u in User, limit: ^Keyword.fetch!(opts, :limit))
@ -154,8 +154,24 @@ defmodule Algora.Accounts do
Repo.one!(from(i in Identity, where: i.user_id == ^user.id and i.provider == "restream"))
{:ok, tokens} = Restream.refresh_access_token(identity.provider_refresh_token)
update_restream_tokens(user, tokens)
{:ok, tokens}
end
def get_restream_token(%User{} = user) do
query = from(i in Identity, where: i.user_id == ^user.id and i.provider == "restream")
with identity when identity != nil <- Repo.one(query),
{:ok, %{token: token}} <- refresh_restream_tokens(user) do
token
else
_ -> nil
end
end
def get_restream_ws_url(%User{} = user) do
if token = get_restream_token(user), do: Restream.websocket_url(token)
end
def gen_stream_key(%User{} = user) do
@ -205,4 +221,42 @@ defmodule Algora.Accounts do
|> Destination.changeset(attrs)
|> Repo.update()
end
def create_entity!(%User{} = user) do
create_entity!(%{
user_id: user.id,
name: user.name,
handle: user.handle,
avatar_url: user.avatar_url,
platform: "algora",
platform_id: Integer.to_string(user.id),
platform_meta: %{}
})
end
def create_entity!(attrs) do
%Entity{}
|> Entity.changeset(attrs)
|> Repo.insert!()
end
def get_entity!(id), do: Repo.get!(Entity, id)
def get_entity(id), do: Repo.get(Entity, id)
def get_entity_by(fields), do: Repo.get_by(Entity, fields)
def get_or_create_entity!(%User{} = user) do
case get_entity_by(user_id: user.id) do
nil -> create_entity!(user)
entity -> entity
end
end
def get_or_create_entity!(%{platform: platform, platform_id: platform_id} = attrs) do
case get_entity_by(platform: platform, platform_id: platform_id) do
nil -> create_entity!(attrs)
entity -> entity
end
end
end

View File

@ -0,0 +1,34 @@
defmodule Algora.Accounts.Entity do
use Ecto.Schema
import Ecto.Changeset
alias Algora.Accounts.User
schema "entities" do
field :name, :string
field :handle, :string
field :avatar_url, :string
field :platform, :string
field :platform_id, :string
field :platform_meta, :map, default: %{}
belongs_to :user, User
timestamps()
end
@doc false
def changeset(entity, attrs) do
entity
|> cast(attrs, [
:user_id,
:name,
:handle,
:avatar_url,
:platform,
:platform_id,
:platform_meta
])
|> validate_required([:handle, :platform, :platform_id, :platform_meta])
end
end

View File

@ -2,7 +2,7 @@ defmodule Algora.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
alias Algora.Accounts.{User, Identity}
alias Algora.Accounts.{User, Identity, Entity}
schema "users" do
field :email, :string
@ -30,6 +30,7 @@ defmodule Algora.Accounts.User do
end
has_many :identities, Identity
has_one :entity, Entity
timestamps()
end

View File

@ -5,7 +5,7 @@ defmodule Algora.Chat do
import Ecto.Query, warn: false
alias Algora.Library.Video
alias Algora.Accounts.User
alias Algora.Accounts.{User, Entity}
alias Algora.{Repo, Accounts}
alias Algora.Chat.{Message, Events}
@ -19,13 +19,19 @@ defmodule Algora.Chat do
def list_messages(%Video{} = video) do
# TODO: add limit
from(m in Message,
join: u in User,
join: e in Entity,
on: m.entity_id == e.id,
left_join: u in User,
on: m.user_id == u.id,
join: v in Video,
on: m.video_id == v.id,
join: c in User,
on: c.id == v.user_id,
select_merge: %{sender_handle: u.handle, channel_id: c.id},
select_merge: %{
platform: e.platform,
sender_handle: coalesce(u.handle, e.handle),
channel_id: c.id
},
where: m.video_id == ^video.id
)
|> order_by_inserted(:asc)
@ -42,26 +48,43 @@ defmodule Algora.Chat do
def get_message!(id) do
from(m in Message,
join: u in User,
join: e in Entity,
on: m.entity_id == e.id,
left_join: u in User,
on: m.user_id == u.id,
join: v in Video,
on: m.video_id == v.id,
join: c in User,
on: c.id == v.user_id,
select_merge: %{sender_handle: u.handle, channel_id: c.id},
select_merge: %{
platform: e.platform,
sender_handle: coalesce(u.handle, e.handle),
channel_id: c.id
},
where: m.id == ^id
)
|> Repo.one!()
end
def create_message(%User{} = user, %Video{} = video, attrs \\ %{}) do
def create_message(%User{} = user, %Video{} = video, attrs) do
entity = Accounts.get_or_create_entity!(user)
%Message{}
|> Message.changeset(attrs)
|> Message.put_entity(entity)
|> Message.put_user(user)
|> Message.put_video(video)
|> Repo.insert()
end
def create_message(%Entity{} = entity, %Video{} = video, attrs) do
%Message{}
|> Message.changeset(attrs)
|> Message.put_entity(entity)
|> Message.put_video(video)
|> Repo.insert()
end
def update_message(%Message{} = message, attrs) do
message
|> Message.changeset(attrs)

View File

@ -1,13 +1,15 @@
defmodule Algora.Chat.Message do
use Ecto.Schema
alias Algora.Accounts.User
alias Algora.Accounts.{User, Entity}
alias Algora.Library.Video
import Ecto.Changeset
schema "messages" do
field :body, :string
field :platform, :string, virtual: true
field :sender_handle, :string, virtual: true
field :channel_id, :integer, virtual: true
belongs_to :entity, Entity
belongs_to :user, User
belongs_to :video, Video
@ -21,6 +23,10 @@ defmodule Algora.Chat.Message do
|> validate_required([:body])
end
def put_entity(%Ecto.Changeset{} = changeset, %Entity{} = entity) do
put_assoc(changeset, :entity, entity)
end
def put_user(%Ecto.Changeset{} = changeset, %User{} = user) do
put_assoc(changeset, :user, user)
end

View File

@ -66,6 +66,8 @@ defmodule Algora.Pipeline do
@impl true
def handle_child_notification(:end_of_stream, _element, _ctx, state) do
Algora.Library.toggle_streamer_live(state.video, false)
# TODO: gracefully terminate open connections (e.g. RTMP, WS)
{[], state}
end

View File

@ -11,6 +11,8 @@ defmodule Algora.Restream do
"https://api.restream.io/login?#{query}"
end
def websocket_url(token), do: "wss://chat.api.restream.io/ws?accessToken=#{token}"
def exchange_access_token(opts) do
code = Keyword.fetch!(opts, :code)
state = Keyword.fetch!(opts, :state)

View File

@ -0,0 +1,89 @@
defmodule Algora.Restream.Websocket do
use WebSockex
require Logger
alias Algora.{Accounts, Chat}
def start_link(%{url: url, video: video}) do
WebSockex.start_link(url, __MODULE__, %{url: url, video: video})
end
def handle_frame({:text, msg}, state) do
case Jason.decode(msg) do
{:ok, action} ->
handle_action(action, state)
{:error, _reason} ->
Logger.error("Failed to parse message: #{msg}")
{:ok, state}
end
end
def handle_disconnect(_reason, state) do
Logger.error("WebSocket disconnected: #{state.video.id}")
{:reconnect, state}
end
defp handle_action(
%{
"action" => "event",
"payload" => %{
"eventPayload" => %{
"author" =>
%{
"name" => handle,
"displayName" => name,
"avatar" => avatar_url,
"id" => platform_id
} = author,
"bot" => false,
"contentModifiers" => %{"whisper" => false},
"text" => body
}
}
} = action,
state
) do
entity =
Accounts.get_or_create_entity!(%{
name: name,
handle: handle,
avatar_url: avatar_url,
platform: get_platform(action),
platform_id: platform_id,
platform_meta: author
})
case Chat.create_message(entity, state.video, %{body: body}) do
{:ok, message} ->
# HACK:
message = Chat.get_message!(message.id)
Chat.broadcast_message_sent!(message)
_error ->
Logger.error("Failed to persist message: #{inspect(action)}")
end
{:ok, state}
end
defp handle_action(action, state) do
Logger.info("Received message: #{inspect(action)}")
{:ok, state}
end
defp get_platform(%{"payload" => %{"connectionIdentifier" => identifier}}) do
parts = String.split(identifier, "-")
case parts do
[_prefix, platform, _suffix] ->
platform
_ ->
Logger.error("Failed to extract platform: #{identifier}")
"unknown"
end
end
defp get_platform(_action), do: "unknown"
end

View File

@ -24,6 +24,16 @@ defimpl Membrane.RTMP.MessageValidator, for: Algora.MessageValidator do
send(impl.pid, {:forward_rtmp, url, String.to_atom("rtmp_sink_#{i}")})
end
user = Algora.Accounts.get_user!(video.user_id)
if url = Algora.Accounts.get_restream_ws_url(user) do
Task.Supervisor.start_child(
Algora.TaskSupervisor,
fn -> Algora.Restream.Websocket.start_link(%{url: url, video: video}) end,
restart: :transient
)
end
{:ok, "connect success"}
end

View File

@ -4,6 +4,7 @@ defmodule AlgoraWeb.ChatLive do
alias Algora.{Accounts, Library, Storage, Chat}
alias AlgoraWeb.{LayoutComponent, Presence}
alias AlgoraWeb.RTMPDestinationIconComponent
def render(assigns) do
assigns = assigns |> assign(tabs: [:chat])
@ -76,6 +77,11 @@ defmodule AlgoraWeb.ChatLive do
class="text-sm break-words flex-1 scrollbar-thin overflow-y-auto inset-0 h-[400px] py-4"
>
<div :for={{id, message} <- @streams.messages} id={id} class="px-4">
<RTMPDestinationIconComponent.icon
:if={message.platform != "algora"}
class="inline-flex w-5 h-5 shrink-0 mr-0.5"
icon={String.to_atom(message.platform)}
/>
<span class={"font-semibold #{if(system_message?(message), do: "text-emerald-400", else: "text-indigo-400")}"}>
<%= message.sender_handle %>:
</span>

View File

@ -39,6 +39,21 @@ defmodule AlgoraWeb.SettingsLive do
</:actions>
</.simple_form>
</div>
<div class="space-y-6 bg-white/5 rounded-lg p-6 ring-1 ring-white/15">
<.header>
Integrations
<:subtitle>
Link other apps
</:subtitle>
</.header>
<div class="space-y-6">
<.button>
<.link href={"/oauth/login/restream?#{URI.encode_query(return_to: "/channel/settings")}"}>
Connect with Restream
</.link>
</.button>
</div>
</div>
<div class="space-y-6 bg-white/5 rounded-lg p-6 ring-1 ring-white/15">
<.header>
Multistreaming
@ -82,21 +97,6 @@ defmodule AlgoraWeb.SettingsLive do
<.button phx-click="show_add_destination_modal">Add Destination</.button>
</div>
</div>
<div :if={@env == :dev} class="space-y-6 bg-white/5 rounded-lg p-6 ring-1 ring-white/15">
<.header>
Integrations
<:subtitle>
Connect with other apps
</:subtitle>
</.header>
<div class="space-y-6">
<.button>
<.link href={"/oauth/login/restream?#{URI.encode_query(return_to: "/channel/settings")}"}>
Restream
</.link>
</.button>
</div>
</div>
</div>
<!-- Add Destination Modal -->
<.modal :if={@show_add_destination_modal} id="add-destination-modal" show>

View File

@ -5,6 +5,7 @@ defmodule AlgoraWeb.VideoLive do
alias Algora.{Accounts, Library, Storage, Chat}
alias AlgoraWeb.{LayoutComponent, Presence}
alias AlgoraWeb.ChannelLive.{StreamFormComponent}
alias AlgoraWeb.RTMPDestinationIconComponent
@impl true
def render(assigns) do
@ -405,6 +406,11 @@ defmodule AlgoraWeb.VideoLive do
id={id}
class="group hover:bg-white/5 relative px-4"
>
<RTMPDestinationIconComponent.icon
:if={message.platform != "algora"}
class="inline-flex w-5 h-5 shrink-0 mr-0.5"
icon={String.to_atom(message.platform)}
/>
<span class={"font-semibold #{if(system_message?(message), do: "text-emerald-400", else: "text-indigo-400")}"}>
<%= message.sender_handle %>:
</span>

View File

@ -76,6 +76,7 @@ defmodule Algora.MixProject do
{:telemetry_poller, "~> 1.0"},
{:thumbnex, "~> 0.5.0"},
{:timex, "~> 3.0"},
{:websockex, "~> 0.4.3"},
# ex_aws
{:ex_aws_s3, "~> 2.3"},
{:ex_doc, "~> 0.29.0"},

View File

@ -136,6 +136,7 @@
"vix": {:hex, :vix, "0.27.0", "c9d6be17abe6fd1b3daed52964331c67ff1f980ea188499d8ac5e723cf215576", [:make, :mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:cc_precompiler, "~> 0.1.4 or ~> 0.2", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.7.3 or ~> 0.8", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:kino, "~> 0.7", [hex: :kino, repo: "hexpm", optional: true]}], "hexpm", "ae4ba5bb9882753396baadfff93b6cab5d4275b13751fd49723591eb116f373a"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
"websockex": {:hex, :websockex, "0.4.3", "92b7905769c79c6480c02daacaca2ddd49de936d912976a4d3c923723b647bf0", [:mix], [], "hexpm", "95f2e7072b85a3a4cc385602d42115b73ce0b74a9121d0d6dbbf557645ac53e4"},
"xla": {:hex, :xla, "0.6.0", "67bb7695efa4a23b06211dc212de6a72af1ad5a9e17325e05e0a87e4c241feb8", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "dd074daf942312c6da87c7ed61b62fb1a075bced157f1cc4d47af2d7c9f44fb7"},
"zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"},
}

View File

@ -0,0 +1,23 @@
defmodule Algora.Repo.Local.Migrations.CreateEntities do
use Ecto.Migration
def change do
create table(:entities) do
add :user_id, references(:users)
add :name, :string
add :handle, :citext, null: false
add :avatar_url, :string
add :platform, :string, null: false
add :platform_id, :string, null: false
add :platform_meta, :map, default: "{}", null: false
timestamps()
end
create unique_index(:entities, [:platform, :platform_id])
create unique_index(:entities, [:platform, :handle])
create index(:entities, [:platform])
create index(:entities, [:platform_id])
create index(:entities, [:handle])
end
end

View File

@ -0,0 +1,28 @@
defmodule Algora.Repo.Local.Migrations.ModifyMessagesForEntities do
use Ecto.Migration
def change do
alter table(:messages) do
add :entity_id, references(:entities)
end
execute """
INSERT INTO entities (user_id, name, handle, avatar_url, platform, platform_id, platform_meta, inserted_at, updated_at)
SELECT id, name, handle, avatar_url, 'algora', id::text, '{}', NOW(), NOW()
FROM users
"""
execute """
UPDATE messages
SET entity_id = entities.id
FROM entities
WHERE messages.user_id = entities.user_id
"""
alter table("messages") do
modify :entity_id, :integer, null: false
end
create index(:messages, [:entity_id])
end
end