1
0
mirror of https://github.com/algora-io/tv.git synced 2025-03-17 20:17:45 +02:00

rewrite chat with liveview streams & pubsub events (#31)

This commit is contained in:
Zafer Cesur 2024-05-05 02:47:05 +03:00 committed by GitHub
parent 12e17848f4
commit cc1c5ddc4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 161 additions and 246 deletions

View File

@ -1,7 +1,6 @@
import "phoenix_html";
import { Socket } from "phoenix";
import { LiveSocket, type ViewHook } from "phoenix_live_view";
import Chat from "./user_socket";
import topbar from "../vendor/topbar";
import videojs from "../vendor/video";
import "../vendor/videojs-youtube";
@ -10,6 +9,27 @@ import "../vendor/videojs-youtube";
// TODO: enable strict mode
// TODO: eliminate anys
interface PhxEvent extends Event {
target: Element;
detail: Record<string, any>;
}
type PhxEventKey = `js:${string}` | `phx:${string}`;
declare global {
interface Window {
liveSocket: LiveSocket;
addEventListener<K extends keyof WindowEventMap | PhxEventKey>(
type: K,
listener: (
this: Window,
ev: K extends keyof WindowEventMap ? WindowEventMap[K] : PhxEvent
) => any,
options?: boolean | AddEventListenerOptions | undefined
): void;
}
}
let isVisible = (el) =>
!!(el.offsetWidth || el.offsetHeight || el.getClientRects().length > 0);
@ -163,10 +183,20 @@ const Hooks = {
};
this.handleEvent("play_video", playVideo);
this.handleEvent("join_chat", Chat.join);
this.handleEvent("message_deleted", ({ id }) => {
document.querySelector(`#message-${id}`)?.remove();
});
},
},
Chat: {
mounted() {
this.el.scrollTo(0, this.el.scrollHeight);
},
updated() {
const pixelsBelowBottom =
this.el.scrollHeight - this.el.clientHeight - this.el.scrollTop;
if (pixelsBelowBottom < 200) {
this.el.scrollTo(0, this.el.scrollHeight);
}
},
},
NavBar: {

25
assets/js/global.d.ts vendored
View File

@ -1,25 +0,0 @@
import { type Channel } from "phoenix";
import { type LiveSocket } from "phoenix_live_view";
interface PhxEvent extends Event {
target: Element;
detail: Record<string, any>;
}
type PhxEventKey = `js:${string}` | `phx:${string}`;
declare global {
interface Window {
liveSocket: LiveSocket;
userToken?: string;
channel?: Channel;
addEventListener<K extends keyof WindowEventMap | PhxEventKey>(
type: K,
listener: (
this: Window,
ev: K extends keyof WindowEventMap ? WindowEventMap[K] : PhxEvent
) => any,
options?: boolean | AddEventListenerOptions | undefined
): void;
}
}

View File

@ -1,81 +0,0 @@
import { type Channel, Socket } from "phoenix";
const systemUser = (sender) => sender === "algora";
const init = () => {
let socket = new Socket("/socket", { params: { token: window.userToken } });
socket.connect();
let channel: Channel;
let chatInput;
let chatMessages;
let handleSend;
const leave = (channel: Channel) => {
channel.leave();
if (chatInput) {
chatInput.value = "";
chatInput.removeEventListener("keypress", handleSend);
}
};
const join = ({ id }) => {
if (channel) {
leave(channel);
}
channel = socket.channel(`room:${id}`, {});
chatInput = document.querySelector("#chat-input");
chatMessages = document.querySelector("#chat-messages");
chatMessages.scrollTop = chatMessages.scrollHeight;
handleSend = (event) => {
if (event.key === "Enter" && chatInput.value.trim()) {
channel.push("new_msg", { body: chatInput.value });
chatInput.value = "";
}
};
if (chatInput) {
chatInput.addEventListener("keypress", handleSend);
}
channel.on("new_msg", (payload) => {
const messageItem = document.createElement("div");
messageItem.id = `message-${payload.id}`;
messageItem.className = "group hover:bg-white/5 relative px-4";
const senderItem = document.createElement("span");
senderItem.innerText = `${payload.user.handle}: `;
senderItem.className = `font-semibold ${
systemUser(payload.user.handle) ? "text-emerald-400" : "text-indigo-400"
}`;
const bodyItem = document.createElement("span");
bodyItem.innerText = `${payload.body}`;
bodyItem.className = "font-medium text-gray-100";
messageItem.appendChild(senderItem);
messageItem.appendChild(bodyItem);
chatMessages.appendChild(messageItem);
chatMessages.scrollTop = chatMessages.scrollHeight;
});
channel
.join()
.receive("ok", (resp) => {
console.log("Joined successfully", resp);
window.channel = channel;
})
.receive("error", (resp) => {
console.log("Unable to join", resp);
});
};
return { join };
};
const Chat = init();
export default Chat;

View File

@ -8,7 +8,13 @@ defmodule Algora.Chat do
alias Algora.Accounts.User
alias Algora.{Repo, Accounts}
alias Algora.Chat.Message
alias Algora.Chat.{Message, Events}
@pubsub Algora.PubSub
def subscribe_to_room(%Video{} = video) do
Phoenix.PubSub.subscribe(@pubsub, topic(video.id))
end
def list_messages(%Video{} = video) do
# TODO: add limit
@ -48,9 +54,11 @@ defmodule Algora.Chat do
|> Repo.one!()
end
def create_message(attrs \\ %{}) do
def create_message(%User{} = user, %Video{} = video, attrs \\ %{}) do
%Message{}
|> Message.changeset(attrs)
|> Message.put_user(user)
|> Message.put_video(video)
|> Repo.insert()
end
@ -67,4 +75,18 @@ defmodule Algora.Chat do
def change_message(%Message{} = message, attrs \\ %{}) do
Message.changeset(message, attrs)
end
defp topic(video_id) when is_integer(video_id), do: "room:#{video_id}"
defp broadcast!(topic, msg) do
Phoenix.PubSub.broadcast!(@pubsub, topic, {__MODULE__, msg})
end
def broadcast_message_deleted!(message) do
broadcast!(topic(message.video_id), %Events.MessageDeleted{message: message})
end
def broadcast_message_sent!(message) do
broadcast!(topic(message.video_id), %Events.MessageSent{message: message})
end
end

View File

@ -0,0 +1,9 @@
defmodule Algora.Chat.Events do
defmodule MessageSent do
defstruct message: nil
end
defmodule MessageDeleted do
defstruct message: nil
end
end

View File

@ -1,15 +1,15 @@
defmodule Algora.Chat.Message do
use Ecto.Schema
alias Algora.Accounts
alias Algora.Library
alias Algora.Accounts.User
alias Algora.Library.Video
import Ecto.Changeset
schema "messages" do
field :body, :string
field :sender_handle, :string, virtual: true
field :channel_id, :integer, virtual: true
belongs_to :user, Accounts.User
belongs_to :video, Library.Video
belongs_to :user, User
belongs_to :video, Video
timestamps()
end
@ -20,4 +20,12 @@ defmodule Algora.Chat.Message do
|> cast(attrs, [:body])
|> validate_required([:body])
end
def put_user(%Ecto.Changeset{} = changeset, %User{} = user) do
put_assoc(changeset, :user, user)
end
def put_video(%Ecto.Changeset{} = changeset, %Video{} = video) do
put_assoc(changeset, :video, video)
end
end

View File

@ -640,10 +640,6 @@ defmodule Algora.Library do
Phoenix.PubSub.broadcast!(@pubsub, topic, {__MODULE__, msg})
end
def broadcast_message_deleted!(%Channel{} = channel, message) do
broadcast!(topic(channel.user_id), %Events.MessageDeleted{message: message})
end
def broadcast_processing_progressed!(stage, video, pct) do
broadcast!(topic_studio(), %Events.ProcessingProgressed{video: video, stage: stage, pct: pct})
end

View File

@ -26,8 +26,4 @@ defmodule Algora.Library.Events do
defmodule ProcessingFailed do
defstruct video: nil, attempt: nil, max_attempts: nil
end
defmodule MessageDeleted do
defstruct message: nil
end
end

View File

@ -1,32 +0,0 @@
defmodule AlgoraWeb.RoomChannel do
alias Algora.Chat.Message
alias Algora.Repo
use Phoenix.Channel
def join("room:" <> _room_id, _params, socket) do
{:ok, socket}
end
def handle_in("new_msg", %{"body" => body}, socket) do
user = socket.assigns.user
"room:" <> video_id = socket.topic
if user do
message =
Repo.insert!(%Message{
body: body,
user_id: user.id,
video_id: String.to_integer(video_id)
})
broadcast!(socket, "new_msg", %{
user: %{id: user.id, handle: user.handle},
id: message.id,
body: body
})
end
{:noreply, socket}
end
end

View File

@ -1,35 +0,0 @@
defmodule AlgoraWeb.UserSocket do
use Phoenix.Socket
alias Algora.Accounts
channel "room:*", AlgoraWeb.RoomChannel
@impl true
def connect(%{"token" => token}, socket, _connect_info) do
# max_age: 1209600 is equivalent to two weeks in seconds
case Phoenix.Token.verify(socket, "user socket", token, max_age: 1_209_600) do
{:ok, 0} ->
{:ok, socket}
{:ok, user_id} ->
user = Accounts.get_user(user_id)
{:ok, assign(socket, :user, user)}
{:error, _} = error ->
error
end
end
# Socket id's are topics that allow you to identify all sockets for a given user:
#
# def id(socket), do: "user_socket:#{socket.assigns.user_id}"
#
# Would allow you to broadcast a "disconnect" event and terminate
# all active sockets and channels for a given user:
#
# Elixir.AlgoraWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
#
# Returning `nil` makes this socket anonymous.
@impl true
def id(_socket), do: nil
end

View File

@ -810,7 +810,8 @@ defmodule AlgoraWeb.CoreComponents do
"text-gray-50 focus:outline-none focus:ring-4 sm:text-sm sm:leading-6",
"phx-no-feedback:border-gray-600 phx-no-feedback:focus:border-gray-500 phx-no-feedback:focus:ring-gray-100/5",
"border-gray-600 focus:border-gray-500 focus:ring-gray-100/5",
@errors != [] && "border-red-500 focus:border-red-500 focus:ring-red-500/10"
@errors != [] &&
"border-red-500 focus:border-red-500 focus:ring-red-500/10 placeholder-red-300"
]}
{@rest}
/>

View File

@ -57,9 +57,6 @@
<link href="https://vjs.zencdn.net/8.10.0/video-js.css" rel="stylesheet" />
<link phx-track-static rel="stylesheet" href={~p"/assets/app.css"} />
<script>
window.userToken = "<%= assigns[:user_token] %>";
</script>
<script defer phx-track-static type="text/javascript" src={~p"/assets/app.js"}>
</script>
<script defer data-domain="tv.algora.io" src="https://plausible.io/js/script.js">

View File

@ -40,9 +40,6 @@
<link href="https://vjs.zencdn.net/8.10.0/video-js.css" rel="stylesheet" />
<link phx-track-static rel="stylesheet" href={~p"/assets/app.css"} />
<script>
window.userToken = "<%= assigns[:user_token] %>";
</script>
<script defer phx-track-static type="text/javascript" src={~p"/assets/app.js"}>
</script>
<script defer data-domain="tv.algora.io" src="https://plausible.io/js/script.js">

View File

@ -91,11 +91,7 @@ defmodule AlgoraWeb.UserAuth do
def fetch_current_user(conn, _opts) do
user_id = get_session(conn, :user_id)
user = user_id && Accounts.get_user(user_id)
token = Phoenix.Token.sign(conn, "user socket", user_id || 0)
conn
|> assign(:current_user, user)
|> assign(:user_token, token)
assign(conn, :current_user, user)
end
@doc """

View File

@ -1,10 +1,6 @@
defmodule AlgoraWeb.Embed.Endpoint do
use Phoenix.Endpoint, otp_app: :algora
socket "/socket", AlgoraWeb.UserSocket,
websocket: true,
longpoll: false
socket "/live", Phoenix.LiveView.Socket
# Serve at "/" the static files from "priv/static" directory.

View File

@ -1,10 +1,6 @@
defmodule AlgoraWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :algora
socket "/socket", AlgoraWeb.UserSocket,
websocket: true,
longpoll: false
# The session will be stored in the cookie and signed,
# this means its contents can be read but not tampered with.
# Set :encryption_salt if you would also like to encrypt it.

View File

@ -20,10 +20,11 @@ defmodule AlgoraWeb.ChatLive do
<div>
<div
id="chat-messages"
phx-update="ignore"
phx-hook="Chat"
phx-update="stream"
class="text-sm break-words flex-1 m-1 scrollbar-thin overflow-y-auto inset-0 h-[400px] w-[400px] fixed overflow-hidden py-4 rounded ring-1 ring-purple-300"
>
<div :for={message <- @messages} id={"message-#{message.id}"} class="px-4">
<div :for={{id, message} <- @streams.messages} id={id} class="px-4">
<span class={"font-semibold #{if(system_message?(message), do: "text-emerald-400", else: "text-indigo-400")}"}>
<%= message.sender_handle %>:
</span>
@ -45,17 +46,18 @@ defmodule AlgoraWeb.ChatLive do
Accounts.get_user_by!(handle: channel_handle)
|> Library.get_channel!()
video = Library.get_video!(video_id)
if connected?(socket) do
Library.subscribe_to_livestreams()
Library.subscribe_to_channel(channel)
Chat.subscribe_to_room(video)
Presence.subscribe(channel_handle)
end
videos = Library.list_channel_videos(channel, 50)
video = Library.get_video!(video_id)
subtitles = Library.list_subtitles(%Library.Video{id: video_id})
data = %{}
@ -78,11 +80,11 @@ defmodule AlgoraWeb.ChatLive do
channel: channel,
videos_count: Enum.count(videos),
video: video,
subtitles: subtitles,
messages: Chat.list_messages(video)
subtitles: subtitles
)
|> assign_form(changeset)
|> stream(:videos, videos)
|> stream(:messages, Chat.list_messages(video))
|> stream(:presences, Presence.list_online_users(channel_handle))
if connected?(socket), do: send(self(), {:play, video})
@ -158,10 +160,14 @@ defmodule AlgoraWeb.ChatLive do
end
def handle_info(
{Library, %Library.Events.MessageDeleted{message: message}},
{Chat, %Chat.Events.MessageDeleted{message: message}},
socket
) do
{:noreply, socket |> push_event("message_deleted", %{id: message.id})}
{:noreply, socket |> stream_delete(:messages, message)}
end
def handle_info({Chat, %Chat.Events.MessageSent{message: message}}, socket) do
{:noreply, socket |> stream_insert(:messages, message)}
end
def handle_info({Library, _}, socket), do: {:noreply, socket}

View File

@ -6,6 +6,7 @@ defmodule AlgoraWeb.VideoLive do
alias AlgoraWeb.{LayoutComponent, Presence}
alias AlgoraWeb.ChannelLive.{StreamFormComponent}
@impl true
def render(assigns) do
~H"""
<div class="lg:mr-[24rem]">
@ -215,7 +216,7 @@ defmodule AlgoraWeb.VideoLive do
<div class={[
"overflow-y-auto text-sm break-words flex-1 scrollbar-thin",
if(@can_edit,
do: "h-[calc(100vh-11rem)]",
do: "h-[calc(100vh-12rem)]",
else: "h-[calc(100vh-8.75rem)]"
)
]}>
@ -241,13 +242,13 @@ defmodule AlgoraWeb.VideoLive do
<.simple_form
:if={@can_edit}
id="edit-transcript"
for={@form}
for={@transcript_form}
phx-submit="save"
phx-update="ignore"
class="hidden h-full px-4"
>
<.input
field={@form[:subtitles]}
field={@transcript_form[:subtitles]}
type="textarea"
label="Edit transcript"
class="font-mono h-[calc(100vh-14.75rem)]"
@ -274,12 +275,13 @@ defmodule AlgoraWeb.VideoLive do
<div :if={tab == :chat}>
<div
id="chat-messages"
phx-update="ignore"
class="text-sm break-words flex-1 scrollbar-thin overflow-y-auto h-[calc(100vh-11rem)]"
phx-hook="Chat"
phx-update="stream"
class="text-sm break-words flex-1 scrollbar-thin overflow-y-auto h-[calc(100vh-12rem)]"
>
<div
:for={message <- @messages}
id={"message-#{message.id}"}
:for={{id, message} <- @streams.messages}
id={id}
class="group hover:bg-white/5 relative px-4"
>
<span class={"font-semibold #{if(system_message?(message), do: "text-emerald-400", else: "text-indigo-400")}"}>
@ -301,13 +303,14 @@ defmodule AlgoraWeb.VideoLive do
</div>
</div>
<div class="px-4">
<input
<.simple_form
:if={@current_user}
id="chat-input"
placeholder="Send a message"
disabled={@current_user == nil}
class="mt-2 bg-gray-950 h-[30px] text-white focus:outline-none focus:ring-purple-400 block w-full min-w-0 rounded-md sm:text-sm ring-1 ring-gray-600 px-2"
/>
for={@chat_form}
phx-submit="send"
phx-change="validate"
>
<.input field={@chat_form[:body]} placeholder="Send a message" autocomplete="off" />
</.simple_form>
<a
:if={!@current_user}
href={Algora.Github.authorize_url()}
@ -330,6 +333,7 @@ defmodule AlgoraWeb.VideoLive do
"""
end
@impl true
def mount(
%{"channel_handle" => channel_handle, "video_id" => video_id} = params,
_session,
@ -341,9 +345,12 @@ defmodule AlgoraWeb.VideoLive do
Accounts.get_user_by!(handle: channel_handle)
|> Library.get_channel!()
video = Library.get_video!(video_id)
if connected?(socket) do
Library.subscribe_to_livestreams()
Library.subscribe_to_channel(channel)
Chat.subscribe_to_room(video)
Presence.track_user(channel_handle, %{
id: if(current_user, do: current_user.handle, else: "")
@ -354,8 +361,6 @@ defmodule AlgoraWeb.VideoLive do
videos = Library.list_channel_videos(channel, 50)
video = Library.get_video!(video_id)
subtitles = Library.list_subtitles(%Library.Video{id: video_id})
data = %{}
@ -367,7 +372,7 @@ defmodule AlgoraWeb.VideoLive do
types = %{subtitles: :string}
changeset =
transcript_changeset =
{data, types}
|> Ecto.Changeset.cast(%{subtitles: encoded_subtitles}, Map.keys(types))
@ -381,14 +386,15 @@ defmodule AlgoraWeb.VideoLive do
videos_count: Enum.count(videos),
video: video,
subtitles: subtitles,
messages: Chat.list_messages(video),
tabs: tabs,
# TODO: reenable once fully implemented
# associated segments need to be removed from db & vectorstore
can_edit: false
can_edit: false,
transcript_form: to_form(transcript_changeset, as: :data),
chat_form: to_form(Chat.change_message(%Chat.Message{}))
)
|> assign_form(changeset)
|> stream(:videos, videos)
|> stream(:messages, Chat.list_messages(video))
|> stream(:presences, Presence.list_online_users(channel_handle))
if connected?(socket), do: send(self(), {:play, {video, params["t"]}})
@ -396,11 +402,13 @@ defmodule AlgoraWeb.VideoLive do
{:ok, socket}
end
@impl true
def handle_params(params, _url, socket) do
LayoutComponent.hide_modal()
{:noreply, socket |> apply_action(socket.assigns.live_action, params)}
end
@impl true
def handle_info({:play, {video, t}}, socket) do
socket =
socket
@ -476,10 +484,14 @@ defmodule AlgoraWeb.VideoLive do
end
def handle_info(
{Library, %Library.Events.MessageDeleted{message: message}},
{Chat, %Chat.Events.MessageDeleted{message: message}},
socket
) do
{:noreply, socket |> push_event("message_deleted", %{id: message.id})}
{:noreply, socket |> stream_delete(:messages, message)}
end
def handle_info({Chat, %Chat.Events.MessageSent{message: message}}, socket) do
{:noreply, socket |> stream_insert(:messages, message)}
end
def handle_info({Library, _}, socket), do: {:noreply, socket}
@ -496,6 +508,36 @@ defmodule AlgoraWeb.VideoLive do
end
end
@impl true
def handle_event("validate", %{"message" => %{"body" => ""}}, socket), do: {:noreply, socket}
def handle_event("validate", %{"message" => params}, socket) do
form =
%Chat.Message{}
|> Chat.change_message(params)
|> Map.put(:action, :insert)
|> to_form()
{:noreply, assign(socket, chat_form: form)}
end
def handle_event("send", %{"message" => %{"body" => ""}}, socket), do: {:noreply, socket}
def handle_event("send", %{"message" => params}, socket) do
%{current_user: current_user, video: video} = socket.assigns
case Chat.create_message(current_user, video, params) do
{:ok, message} ->
# HACK:
message = Chat.get_message!(message.id)
Chat.broadcast_message_sent!(message)
{:noreply, assign(socket, chat_form: to_form(Chat.change_message(%Chat.Message{})))}
{:error, %Ecto.Changeset{} = changeset} ->
{:noreply, assign(socket, chat_form: to_form(changeset))}
end
end
def handle_event("save", %{"data" => %{"subtitles" => subtitles}, "save" => save_type}, socket) do
{time, count} = :timer.tc(&save/2, [save_type, subtitles])
msg = "Updated #{count} subtitles in #{fmt(round(time / 1000))} ms"
@ -509,7 +551,7 @@ defmodule AlgoraWeb.VideoLive do
if current_user && Chat.can_delete?(current_user, message) do
{:ok, message} = Chat.delete_message(message)
Library.broadcast_message_deleted!(socket.assigns.channel, message)
Chat.broadcast_message_deleted!(message)
{:noreply, socket}
else
{:noreply, socket |> put_flash(:error, "You can't do that")}
@ -546,10 +588,6 @@ defmodule AlgoraWeb.VideoLive do
if cond, do: list ++ [extra], else: list
end
defp assign_form(socket, %Ecto.Changeset{} = changeset) do
assign(socket, :form, to_form(changeset, as: :data))
end
defp apply_action(socket, :stream, _params) do
if socket.assigns.owns_channel? do
socket