add pubsub

This commit is contained in:
Anbraten 2024-09-20 09:03:42 +02:00
parent abcc6094c5
commit 597fac4b0c
No known key found for this signature in database
GPG Key ID: B1222603899C6B25
7 changed files with 123 additions and 36 deletions

View File

@ -9,7 +9,7 @@ import (
"code.gitea.io/gitea/services/websocket"
)
func Init(r *web.Route) {
func Init(r *web.Router) {
m := websocket.Init()
r.Any("/-/ws", func(ctx *context.Context) {

View File

@ -5,6 +5,7 @@ package pubsub
import (
"context"
"errors"
"sync"
)
@ -21,19 +22,21 @@ func NewMemory() Broker {
}
}
func (p *Memory) Publish(_ context.Context, message Message) {
func (p *Memory) Publish(_ context.Context, _topic string, data []byte) error {
p.Lock()
topic, ok := p.topics[message.Topic]
topic, ok := p.topics[_topic]
if !ok {
p.Unlock()
return
return errors.New("topic not found")
}
for s := range topic {
go (*s)(message)
go (*s)(data)
}
p.Unlock()
return nil
}
func (p *Memory) Subscribe(c context.Context, topic string, subscriber Subscriber) {

View File

@ -14,12 +14,9 @@ import (
func TestPubsub(t *testing.T) {
var (
wg sync.WaitGroup
testMessage = Message{
Data: []byte("test"),
Topic: "hello-world",
}
wg sync.WaitGroup
testTopic = "hello-world"
testMessage = []byte("test")
)
ctx, cancel := context.WithCancelCause(
@ -28,10 +25,10 @@ func TestPubsub(t *testing.T) {
broker := NewMemory()
go func() {
broker.Subscribe(ctx, "hello-world", func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
broker.Subscribe(ctx, testTopic, func(message []byte) { assert.Equal(t, testMessage, message); wg.Done() })
}()
go func() {
broker.Subscribe(ctx, "hello-world", func(_ Message) { wg.Done() })
broker.Subscribe(ctx, testTopic, func(_ []byte) { wg.Done() })
}()
// Wait a bit for the subscriptions to be registered
@ -39,7 +36,7 @@ func TestPubsub(t *testing.T) {
wg.Add(2)
go func() {
broker.Publish(ctx, testMessage)
broker.Publish(ctx, testTopic, testMessage)
}()
wg.Wait()

View File

@ -0,0 +1,56 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package pubsub
import (
"context"
issues_model "code.gitea.io/gitea/models/issues"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
notify_service "code.gitea.io/gitea/services/notify"
)
func Init() Broker {
broker := NewMemory() // TODO: allow for other pubsub implementations
notify_service.RegisterNotifier(newNotifier(broker))
return broker
}
type pubsubNotifier struct {
notify_service.NullNotifier
broker Broker
}
// NewNotifier create a new pubsubNotifier notifier
func newNotifier(broker Broker) notify_service.Notifier {
return &pubsubNotifier{
broker: broker,
}
}
func (p *pubsubNotifier) DeleteComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment) {
data := struct {
Function string
Comment *issues_model.Comment
Doer *user_model.User
}{
Function: "DeleteComment",
Comment: c,
Doer: doer,
}
msg, err := json.Marshal(data)
if err != nil {
log.Error("Failed to marshal message: %v", err)
return
}
err = p.broker.Publish(ctx, "notify", msg)
if err != nil {
log.Error("Failed to publish message: %v", err)
return
}
}

View File

@ -5,19 +5,10 @@ package pubsub
import "context"
// Message defines a published message.
type Message struct {
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Topic is the topic of the message.
Topic string `json:"topic"`
}
// Subscriber receives published messages.
type Subscriber func(Message)
type Subscriber func(data []byte)
type Broker interface {
Publish(c context.Context, message Message)
Publish(c context.Context, topic string, data []byte) error
Subscribe(c context.Context, topic string, subscriber Subscriber)
}

View File

@ -7,21 +7,22 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/templates"
notify_service "code.gitea.io/gitea/services/notify"
"code.gitea.io/gitea/services/pubsub"
"github.com/olahol/melody"
)
var _ notify_service.Notifier = &websocketNotifier{}
type websocketNotifier struct {
notify_service.NullNotifier
m *melody.Melody
rnd *templates.HTMLRender
melody *melody.Melody
htmlRenderer *templates.HTMLRender
}
// NewNotifier create a new webhooksNotifier notifier
func newNotifier(m *melody.Melody, pubsub pubsub.Broker) notify_service.Notifier {
func newNotifier(m *melody.Melody) notify_service.Notifier {
return &websocketNotifier{
m: m,
rnd: templates.HTMLRenderer(),
melody: m,
htmlRenderer: templates.HTMLRenderer(),
}
}
@ -31,7 +32,7 @@ func newNotifier(m *melody.Melody, pubsub pubsub.Broker) notify_service.Notifier
var htmxRemoveElement = "<div hx-swap-oob=\"delete:%s\"></div>"
func (n *websocketNotifier) filterSessions(fn func(*melody.Session, *sessionData) bool) []*melody.Session {
sessions, err := n.m.Sessions()
sessions, err := n.melody.Sessions()
if err != nil {
log.Error("Failed to get sessions: %v", err)
return nil

View File

@ -4,9 +4,14 @@
package websocket
import (
"context"
issues_model "code.gitea.io/gitea/models/issues"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/services/context"
notify_service "code.gitea.io/gitea/services/notify"
"code.gitea.io/gitea/modules/log"
gitea_context "code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/pubsub"
"github.com/mitchellh/mapstructure"
@ -31,12 +36,46 @@ func Init() *melody.Melody {
m.HandleDisconnect(handleDisconnect)
broker := pubsub.NewMemory() // TODO: allow for other pubsub implementations
notify_service.RegisterNotifier(newNotifier(m, broker))
notifier := newNotifier(m)
ctx, unsubscribe := context.WithCancel(context.Background())
graceful.GetManager().RunAtShutdown(ctx, func() {
unsubscribe()
})
broker.Subscribe(ctx, "notify", func(msg []byte) {
data := struct {
Function string
}{}
err := json.Unmarshal(msg, &data)
if err != nil {
log.Error("Failed to unmarshal message: %v", err)
return
}
switch data.Function {
case "DeleteComment":
var data struct {
Comment *issues_model.Comment
Doer *user_model.User
}
err := json.Unmarshal(msg, &data)
if err != nil {
log.Error("Failed to unmarshal message: %v", err)
return
}
notifier.DeleteComment(context.Background(), data.Doer, data.Comment)
}
})
return m
}
func handleConnect(s *melody.Session) {
ctx := context.GetWebContext(s.Request)
ctx := gitea_context.GetWebContext(s.Request)
data := &sessionData{}
if ctx.IsSigned {