208 lines
5.1 KiB
Go
208 lines
5.1 KiB
Go
package broker
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/labstack/gommon/log"
|
|
"net/http"
|
|
)
|
|
|
|
// Broker is is responsible for keeping a list of which clients (browsers)
|
|
// are currently attached and broadcasting events (messages) to those clients.
|
|
type Broker struct {
|
|
|
|
// Create a map of clients, the keys of the map are the channels
|
|
// over which we can push messages to attached clients. (The values
|
|
// are just booleans and are meaningless.)
|
|
//
|
|
clients map[chan Message]bool
|
|
|
|
// Channel into which new clients can be pushed
|
|
//
|
|
newClients chan chan Message
|
|
|
|
// Channel into which disconnected clients should be pushed
|
|
//
|
|
defunctClients chan chan Message
|
|
|
|
// Channel into which messages are pushed to be broadcast out
|
|
// to attahed clients.
|
|
//
|
|
messages chan Message
|
|
|
|
// This function is called every time a new clients connects.
|
|
// Its output is sent to the client as initial connect message.
|
|
init func(c echo.Context) (interface{}, error)
|
|
}
|
|
|
|
// MessageKind represents a message kind
|
|
type MessageKind string
|
|
|
|
// These are all valid message kinds
|
|
const (
|
|
KindInit MessageKind = `init`
|
|
KindUpdate MessageKind = `update`
|
|
KindCreate MessageKind = `create`
|
|
KindDelete MessageKind = `delete`
|
|
)
|
|
|
|
// Message represents a message
|
|
type Message struct {
|
|
Kind MessageKind `json:"kind"`
|
|
Data interface{} `json:"data"`
|
|
}
|
|
|
|
var broker *Broker
|
|
|
|
// Init creates a new local broker
|
|
func Init(init func(c echo.Context) (interface{}, error)) {
|
|
broker = &Broker{
|
|
clients: make(map[chan Message]bool),
|
|
newClients: make(chan (chan Message)),
|
|
defunctClients: make(chan (chan Message)),
|
|
messages: make(chan Message),
|
|
init: init,
|
|
}
|
|
}
|
|
|
|
// Start starts a new goroutine. It handles the addition & removal of clients,
|
|
// as well as the broadcasting of messages out to clients that are currently attached.
|
|
func Start() {
|
|
|
|
if broker.init == nil {
|
|
panic("Init function must not be nil!")
|
|
}
|
|
|
|
// Start a goroutine
|
|
//
|
|
go func() {
|
|
|
|
// Loop endlessly
|
|
//
|
|
for {
|
|
|
|
// Block until we receive from one of the
|
|
// three following channels.
|
|
select {
|
|
|
|
case s := <-broker.newClients:
|
|
|
|
// There is a new client attached and we
|
|
// want to start sending them messages.
|
|
broker.clients[s] = true
|
|
log.Debug("Added new client")
|
|
|
|
case s := <-broker.defunctClients:
|
|
|
|
// A client has dettached and we want to
|
|
// stop sending them messages.
|
|
delete(broker.clients, s)
|
|
close(s)
|
|
|
|
log.Debug("Removed client")
|
|
|
|
case msg := <-broker.messages:
|
|
|
|
// There is a new message to send. For each
|
|
// attached client, push the new message
|
|
// into the client's message channel.
|
|
for s := range broker.clients {
|
|
s <- msg
|
|
}
|
|
log.Printf("Broadcast message to %d clients", len(broker.clients))
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Serve is the web handler clients use to connect to the broker
|
|
func Serve(c echo.Context) error {
|
|
|
|
rw := c.Response().Writer
|
|
|
|
// Make sure that the writer supports flushing.
|
|
//
|
|
f, ok := rw.(http.Flusher)
|
|
if !ok {
|
|
return echo.NewHTTPError(http.StatusInternalServerError, "Streaming not supported!")
|
|
}
|
|
|
|
// Create a new channel, over which the broker can
|
|
// send this client messages.
|
|
messageChan := make(chan Message)
|
|
|
|
// Add this client to the map of those that should
|
|
// receive updates
|
|
broker.newClients <- messageChan
|
|
|
|
// Listen to the closing of the http connection via the CloseNotifier
|
|
// FIXME: use Done()
|
|
notify := rw.(http.CloseNotifier).CloseNotify()
|
|
go func() {
|
|
<-notify
|
|
// Remove this client from the map of attached clients
|
|
// when `EventHandler` exits.
|
|
broker.defunctClients <- messageChan
|
|
log.Debug("HTTP connection closed.")
|
|
}()
|
|
|
|
// Set the headers related to event streaming.
|
|
rw.Header().Set("Content-Type", "text/event-stream")
|
|
rw.Header().Set("Cache-Control", "no-cache")
|
|
rw.Header().Set("Connection", "keep-alive")
|
|
rw.Header().Set("Transfer-Encoding", "chunked")
|
|
|
|
// Push the initial list to the user
|
|
data, err := broker.init(c)
|
|
if err != nil {
|
|
log.Errorf("Error getting initial data: %s", err)
|
|
}
|
|
|
|
sendJSONMessage(&Message{
|
|
Kind: KindInit,
|
|
Data: data,
|
|
}, f, rw)
|
|
|
|
// Don't close the connection, instead loop endlessly.
|
|
for {
|
|
|
|
// Read from our messageChan.
|
|
msg, open := <-messageChan
|
|
|
|
if !open {
|
|
// If our messageChan was closed, this means that the client has
|
|
// disconnected.
|
|
break
|
|
}
|
|
|
|
sendJSONMessage(&msg, f, rw)
|
|
}
|
|
|
|
// Done.
|
|
log.Debug("Finished HTTP request")
|
|
return nil
|
|
}
|
|
|
|
// SendMessage lets any package send a message to clients
|
|
func SendMessage(m Message) {
|
|
broker.messages <- m
|
|
}
|
|
|
|
func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) {
|
|
jsonMessage, err := json.Marshal(m.Data)
|
|
if err != nil {
|
|
log.Errorf("Error serializing json: %s", err)
|
|
}
|
|
|
|
// The format is defined and has to be like this.
|
|
// This makes it however possible to register different event handlers for each event kind
|
|
_, err = fmt.Fprintf(rw, "event:%s\ndata: %s\n\n", m.Kind, string(jsonMessage))
|
|
if err != nil {
|
|
log.Errorf("Error sending message to client: %s", err)
|
|
}
|
|
// Flush the response. This is only possible if
|
|
// the repsonse supports streaming.
|
|
f.Flush()
|
|
}
|