1
0
mirror of https://github.com/algora-io/tv.git synced 2025-03-17 20:17:45 +02:00

add read replicas & dns clustering for distributed elixir nodes (#7)

This commit is contained in:
Zafer Cesur 2024-03-11 21:55:17 +03:00 committed by GitHub
parent dc6f20a554
commit dd1b31c4cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 59 additions and 47 deletions

View File

@ -8,8 +8,7 @@
import Config
config :algora,
replica: Algora.ReplicaRepo,
ecto_repos: [Algora.Repo],
ecto_repos: [Algora.Repo.Local],
rtmp_port: 9006
# Configures the endpoint

View File

@ -31,8 +31,7 @@ config :algora, Algora.Repo,
show_sensitive_data_on_connection_error: true,
pool_size: 10
# Configure your replica database
config :algora, Algora.ReplicaRepo,
config :algora, Algora.Repo.Local,
url: System.get_env("DATABASE_URL"),
show_sensitive_data_on_connection_error: true,
pool_size: 10,

View File

@ -19,8 +19,6 @@ if config_env() == :prod do
For example: ecto://USER:PASS@HOST/DATABASE
"""
replica_database_url = System.get_env("REPLICA_DATABASE_URL") || database_url
host = System.get_env("PHX_HOST") || "example.com"
ecto_ipv6? = System.get_env("ECTO_IPV6") == "true"
@ -28,17 +26,19 @@ if config_env() == :prod do
System.get_env("FLY_APP_NAME") ||
raise "FLY_APP_NAME not available"
config :algora, dns_cluster_query: System.get_env("DNS_CLUSTER_QUERY")
config :algora, Algora.Repo,
# ssl: true,
socket_options: if(ecto_ipv6?, do: [:inet6], else: []),
url: database_url,
pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
config :algora, Algora.ReplicaRepo,
config :algora, Algora.Repo.Local,
# ssl: true,
priv: "priv/repo",
socket_options: if(ecto_ipv6?, do: [:inet6], else: []),
url: replica_database_url,
url: database_url,
pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
secret_key_base =

View File

@ -1,8 +1,5 @@
import Config
config :algora,
replica: Algora.Repo
# Configure your database
#
# The MIX_TEST_PARTITION environment variable can be used
@ -16,11 +13,8 @@ config :algora, Algora.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
pool_size: 10
config :algora, Algora.ReplicaRepo,
username: "postgres",
password: "postgres",
database: "algora_test#{System.get_env("MIX_TEST_PARTITION")}",
hostname: "localhost",
config :algora, Algora.Repo.Local,
url: System.get_env("DATABASE_URL"),
show_sensitive_data_on_connection_error: true,
pool_size: 10,
priv: "priv/repo"

View File

@ -4,7 +4,7 @@
#
app = 'algora-media'
primary_region = 'otp'
primary_region = 'lax'
kill_signal = 'SIGTERM'
kill_timeout = '5s'
@ -17,6 +17,8 @@ kill_timeout = '5s'
release_command = '/app/bin/migrate'
[env]
DNS_CLUSTER_QUERY = 'algora-media.internal'
PRIMARY_REGION = 'lax'
PHX_HOST = 'tv.algora.io'
PORT = '4000'

View File

@ -6,11 +6,11 @@ defmodule Algora.Accounts do
alias Algora.Accounts.{User, Identity}
def list_users(opts) do
Repo.replica().all(from u in User, limit: ^Keyword.fetch!(opts, :limit))
Repo.all(from u in User, limit: ^Keyword.fetch!(opts, :limit))
end
def get_users_map(user_ids) when is_list(user_ids) do
Repo.replica().all(from u in User, where: u.id in ^user_ids, select: {u.id, u})
Repo.all(from u in User, where: u.id in ^user_ids, select: {u.id, u})
end
def admin?(%User{} = user) do
@ -53,11 +53,11 @@ defmodule Algora.Accounts do
** (Ecto.NoResultsError)
"""
def get_user!(id), do: Repo.replica().get!(User, id)
def get_user!(id), do: Repo.get!(User, id)
def get_user(id), do: Repo.replica().get(User, id)
def get_user(id), do: Repo.get(User, id)
def get_user_by!(fields), do: Repo.replica().get_by!(User, fields)
def get_user_by!(fields), do: Repo.get_by!(User, fields)
## User registration

View File

@ -28,9 +28,12 @@ defmodule Algora.Application do
children = [
{Cluster.Supervisor, [topologies, [name: Algora.ClusterSupervisor]]},
{Task.Supervisor, name: Algora.TaskSupervisor},
# Start the RPC server
{Fly.RPC, []},
# Start the Ecto repository
Algora.Repo,
Algora.ReplicaRepo,
Algora.Repo.Local,
# Start the supervisor for LSN tracking
{Fly.Postgres.LSN.Supervisor, repo: Algora.Repo.Local},
# Start the Telemetry supervisor
AlgoraWeb.Telemetry,
# Start the PubSub system
@ -38,6 +41,8 @@ defmodule Algora.Application do
# Start presence
AlgoraWeb.Presence,
{Finch, name: Algora.Finch},
# Clustering setup
{DNSCluster, query: Application.get_env(:algora, :dns_cluster_query) || :ignore},
# Start the Endpoint (http/https)
AlgoraWeb.Endpoint,
# Start the RTMP server

View File

@ -19,14 +19,14 @@ defmodule Algora.Chat do
select_merge: %{sender_handle: u.handle}
)
|> order_by_inserted(:asc)
|> Repo.replica().all()
|> Repo.all()
end
defp order_by_inserted(%Ecto.Query{} = query, direction) when direction in [:asc, :desc] do
from(s in query, order_by: [{^direction, s.inserted_at}])
end
def get_message!(id), do: Repo.replica().get!(Message, id)
def get_message!(id), do: Repo.get!(Message, id)
def create_message(attrs \\ %{}) do
%Message{}

View File

@ -200,7 +200,7 @@ defmodule Algora.Library do
select_merge: %{channel_name: u.name}
)
|> order_by_inserted(:desc)
|> Repo.replica().all()
|> Repo.all()
end
def list_shorts(limit \\ 100) do
@ -212,7 +212,7 @@ defmodule Algora.Library do
select_merge: %{channel_name: u.name}
)
|> order_by_inserted(:desc)
|> Repo.replica().all()
|> Repo.all()
end
def list_channel_videos(%Channel{} = channel, limit \\ 100) do
@ -224,7 +224,7 @@ defmodule Algora.Library do
where: v.user_id == ^channel.user_id
)
|> order_by_inserted(:desc)
|> Repo.replica().all()
|> Repo.all()
end
def list_active_channels(opts) do
@ -234,7 +234,7 @@ defmodule Algora.Library do
order_by: [desc: u.updated_at],
select: struct(u, [:id, :handle, :channel_tagline, :avatar_url, :external_homepage_url])
)
|> Repo.replica().all()
|> Repo.all()
|> Enum.map(&get_channel!/1)
end
@ -284,7 +284,7 @@ defmodule Algora.Library do
end
end
def get_video!(id), do: Repo.replica().get!(Video, id)
def get_video!(id), do: Repo.get!(Video, id)
def update_video(%Video{} = video, attrs) do
video
@ -302,7 +302,7 @@ defmodule Algora.Library do
def list_subtitles(%Video{} = video) do
from(s in Subtitle, where: s.video_id == ^video.id, order_by: [asc: s.start])
|> Repo.replica().all()
|> Repo.all()
end
def get_subtitle!(id), do: Repo.get!(Subtitle, id)

View File

@ -1,23 +1,20 @@
defmodule Algora.Repo do
defmodule Algora.Repo.Local do
use Ecto.Repo,
otp_app: :algora,
adapter: Ecto.Adapters.Postgres
def replica, do: Algora.config([:replica])
@env Mix.env()
@locks %{playlist: 1}
# Dynamically configure the database url based on runtime and build
# environments.
def init(_type, config) do
# url = Fly.Postgres.rewrite_database_url!(config)
# dbg(url)
def multi_transaction_lock(multi, scope, id) when is_atom(scope) and is_integer(id) do
scope_int = Map.fetch!(@locks, scope)
Ecto.Multi.run(multi, scope, fn repo, _changes ->
repo.query("SELECT pg_advisory_xact_lock(#{scope_int}, #{id})")
end)
Fly.Postgres.config_repo_url(config, @env)
end
end
defmodule Algora.ReplicaRepo do
use Ecto.Repo,
otp_app: :algora,
adapter: Ecto.Adapters.Postgres
defmodule Algora.Repo do
use Fly.Repo, local_repo: Algora.Repo.Local
end

View File

@ -35,6 +35,7 @@ defmodule Algora.MixProject do
{:castore, "~> 0.1.13"},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.3", only: [:dev], runtime: false},
{:dns_cluster, "~> 0.1.1"},
{:ecto_network, "~> 1.3.0"},
{:ecto_sql, "~> 3.6"},
{:esbuild, "~> 0.8", runtime: Mix.env() == :dev},
@ -43,6 +44,7 @@ defmodule Algora.MixProject do
{:ffmpex, "~> 0.10.0"},
{:finch, "~> 0.13"},
{:floki, ">= 0.30.0", only: :test},
{:fly_postgres, "~> 0.3.0"},
{:gettext, "~> 0.18"},
{:heroicons, "~> 0.5.0"},
{:jason, "~> 1.2"},
@ -60,7 +62,6 @@ defmodule Algora.MixProject do
{:phoenix_live_view, "~> 0.20.2"},
{:phoenix, "~> 1.7.11"},
{:plug_cowboy, "~> 2.5"},
{:postgrex, ">= 0.0.0"},
{:swoosh, "~> 1.3"},
{:tailwind, "~> 0.2", runtime: Mix.env() == :dev},
{:telemetry_metrics, "~> 0.6"},

View File

@ -16,6 +16,7 @@
"db_connection": {:hex, :db_connection, "2.4.2", "f92e79aff2375299a16bcb069a14ee8615c3414863a6fef93156aee8e86c2ff3", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4fe53ca91b99f55ea249693a0229356a08f4d1a7931d8ffa79289b145fe83668"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ecto": {:hex, :ecto, "3.9.6", "2f420c173efcb2e22fa4f8fc41e75e02b3c5bd4cffef12085cae5418c12e530d", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df17bc06ba6f78a7b764e4a14ef877fe5f4499332c5a105ace11fe7013b72c84"},
"ecto_network": {:hex, :ecto_network, "1.3.0", "1e77fa37c20e0f6a426d3862732f3317b0fa4c18f123d325f81752a491d7304e", [:mix], [{:ecto_sql, ">= 3.0.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:phoenix_html, ">= 0.0.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:postgrex, ">= 0.14.0", [hex: :postgrex, repo: "hexpm", optional: false]}], "hexpm", "053a5e46ef2837e8ea5ea97c82fa0f5494699209eddd764e663c85f11b2865bd"},
@ -27,12 +28,15 @@
"ex_aws_s3": {:hex, :ex_aws_s3, "2.5.3", "422468e5c3e1a4da5298e66c3468b465cfd354b842e512cb1f6fbbe4e2f5bdaf", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "4f09dd372cc386550e484808c5ac5027766c8d0cd8271ccc578b82ee6ef4f3b8"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"ex_m3u8": {:hex, :ex_m3u8, "0.9.0", "54a12463320236aab09402bc69676f665e692636235a2b186a22df507ebc5643", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "d57939a90d8da5956264d27a516c5e2ac80b09c8adbe4e3199d7d14c79549b5c"},
"expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"},
"exsync": {:hex, :exsync, "0.3.0", "39ab8b3d4e5fe779a34ad930135145283ebf56069513dfdfaad4e30a04b158c7", [:mix], [{:file_system, "~> 0.2", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "2030d085a14fa5f685d53d97171a21345dddaf2b67a0927263efc6b2cd2bb09f"},
"ffmpex": {:hex, :ffmpex, "0.10.0", "ce29281eac60bf109c05acb4342eecf813a3cd3f08c1bce350423caad86128af", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:rambo, "~> 0.3.0", [hex: :rambo, repo: "hexpm", optional: false]}], "hexpm", "de8d81f8c51cc258dcee9a3e0b1568b0659c97be004557d9af47795206cff53b"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"finch": {:hex, :finch, "0.13.0", "c881e5460ec563bf02d4f4584079e62201db676ed4c0ef3e59189331c4eddf7b", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "49957dcde10dcdc042a123a507a9c5ec5a803f53646d451db2f7dea696fba6cc"},
"floki": {:hex, :floki, "0.34.0", "002d0cc194b48794d74711731db004fafeb328fe676976f160685262d43706a8", [:mix], [], "hexpm", "9c3a9f43f40dde00332a589bd9d389b90c1f518aef500364d00636acc5ebc99c"},
"gettext": {:hex, :gettext, "0.20.0", "75ad71de05f2ef56991dbae224d35c68b098dd0e26918def5bb45591d5c8d429", [:mix], [], "hexpm", "1c03b177435e93a47441d7f681a7040bd2a816ece9e2666d1c9001035121eb3d"},
"fly_postgres": {:hex, :fly_postgres, "0.3.4", "aa2e8845bafefb2c998446e4b886efbb3a74d52ff360f6c1a0500c0547b1aef2", [:mix], [{:ecto_sql, ">= 3.4.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:fly_rpc, "~> 0.3.0", [hex: :fly_rpc, repo: "hexpm", optional: false]}, {:postgrex, ">= 0.0.0", [hex: :postgrex, repo: "hexpm", optional: false]}], "hexpm", "39c98e433cedfb450c39969612d0dd5628967e2a5394647c3ade43c7c357d183"},
"fly_rpc": {:hex, :fly_rpc, "0.3.0", "a122cf0bc545403d8ad84f81b915ff0edb3c05f090519aa0e8b37fbb0e16ad83", [:mix], [], "hexpm", "5bbbc691b05659e80081cdf769630834e31bfca973d0b54be5191ce6ed33df72"},
"gettext": {:hex, :gettext, "0.24.0", "6f4d90ac5f3111673cbefc4ebee96fe5f37a114861ab8c7b7d5b30a1108ce6d8", [:mix], [{:expo, "~> 0.5.1", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "bdf75cdfcbe9e4622dd18e034b227d77dd17f0f133853a1c73b97b3d6c770e8b"},
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
"heroicons": {:hex, :heroicons, "0.5.3", "ee8ae8335303df3b18f2cc07f46e1cb6e761ba4cf2c901623fbe9a28c0bc51dd", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "a210037e8a09ac17e2a0a0779d729e89c821c944434c3baa7edfc1f5b32f3502"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},

View File

@ -0,0 +1,11 @@
defmodule Algora.Repo.Migrations.AddFlyPostgresProc do
use Ecto.Migration
def up do
Fly.Postgres.Migrations.V01.up()
end
def down do
Fly.Postgres.Migrations.V01.down()
end
end