1
0
mirror of https://github.com/pocketbase/pocketbase.git synced 2025-03-27 16:25:41 +02:00

added subscription.Message.WriteSSE method

This commit is contained in:
Gani Georgiev 2025-02-21 13:04:23 +02:00
parent 973916bb48
commit 4db497c5e1
6 changed files with 3481 additions and 3409 deletions

View File

@ -9,7 +9,9 @@
- Added `$os.stat(file)` JSVM helper ([#6407](https://github.com/pocketbase/pocketbase/discussions/6407)).
- Added `Store.SetFunc(key, func(old T) new T)` to set/update a store value with the return result of the callback in a concurrent safe manner.
- Added `store.Store.SetFunc(key, func(old T) new T)` to set/update a store value with the return result of the callback in a concurrent safe manner.
- Added `subscription.Message.WriteSSE(w, id)` for writing an SSE formatted message into the provided writer interface (_usually used for unit testing_).
## v0.25.6

View File

@ -84,11 +84,10 @@ func realtimeConnect(e *core.RequestEvent) error {
Data: []byte(`{"clientId":"` + ce.Client.Id() + `"}`),
}
connectMsgErr := ce.App.OnRealtimeMessageSend().Trigger(connectMsgEvent, func(me *core.RealtimeMessageEvent) error {
me.Response.Write([]byte("id:" + me.Client.Id() + "\n"))
me.Response.Write([]byte("event:" + me.Message.Name + "\n"))
me.Response.Write([]byte("data:"))
me.Response.Write(me.Message.Data)
me.Response.Write([]byte("\n\n"))
err := me.Message.WriteSSE(me.Response, me.Client.Id())
if err != nil {
return err
}
return me.Flush()
})
if connectMsgErr != nil {
@ -123,11 +122,10 @@ func realtimeConnect(e *core.RequestEvent) error {
msgEvent.Client = ce.Client
msgEvent.Message = &msg
msgErr := ce.App.OnRealtimeMessageSend().Trigger(msgEvent, func(me *core.RealtimeMessageEvent) error {
me.Response.Write([]byte("id:" + me.Client.Id() + "\n"))
me.Response.Write([]byte("event:" + me.Message.Name + "\n"))
me.Response.Write([]byte("data:"))
me.Response.Write(me.Message.Data)
me.Response.Write([]byte("\n\n"))
err := me.Message.WriteSSE(me.Response, me.Client.Id())
if err != nil {
return err
}
return me.Flush()
})
if msgErr != nil {

File diff suppressed because it is too large Load Diff

View File

@ -13,12 +13,6 @@ import (
const optionsParam = "options"
// Message defines a client's channel data.
type Message struct {
Name string `json:"name"`
Data []byte `json:"data"`
}
// SubscriptionOptions defines the request options (query params, headers, etc.)
// for a single subscription topic.
type SubscriptionOptions struct {

View File

@ -0,0 +1,37 @@
package subscriptions
import (
"io"
)
// Message defines a client's channel data.
type Message struct {
Name string `json:"name"`
Data []byte `json:"data"`
}
// WriteSSE writes the current message in a SSE format into the provided writer.
//
// For example, writing to a router.Event:
//
// m := Message{Name: "users/create", Data: []byte{...}}
// m.Write(e.Response, "yourEventId")
// e.Flush()
func (m *Message) WriteSSE(w io.Writer, eventId string) error {
parts := [][]byte{
[]byte("id:" + eventId + "\n"),
[]byte("event:" + m.Name + "\n"),
[]byte("data:"),
m.Data,
[]byte("\n\n"),
}
for _, part := range parts {
_, err := w.Write(part)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,25 @@
package subscriptions_test
import (
"strings"
"testing"
"github.com/pocketbase/pocketbase/tools/subscriptions"
)
func TestMessageWrite(t *testing.T) {
m := subscriptions.Message{
Name: "test_name",
Data: []byte("test_data"),
}
var sb strings.Builder
m.WriteSSE(&sb, "test_id")
expected := "id:test_id\nevent:test_name\ndata:test_data\n\n"
if v := sb.String(); v != expected {
t.Fatalf("Expected writer content\n%q\ngot\n%q", expected, v)
}
}