From 560ea833b1c49d59d378af4324f06216533d920c Mon Sep 17 00:00:00 2001 From: zuma Date: Wed, 12 Mar 2025 19:52:05 +0100 Subject: [PATCH] Feat: Added listener count --- server.go | 82 ++++++++++++++++++++++++++++++++++++++++++++--- src/App.svelte | 32 ++++++++++++++++-- src/assets/d1.svg | 24 ++++++++++++++ src/assets/d2.svg | 24 ++++++++++++++ src/assets/d3.svg | 24 ++++++++++++++ src/assets/d4.svg | 24 ++++++++++++++ src/assets/d5.svg | 24 ++++++++++++++ 7 files changed, 227 insertions(+), 7 deletions(-) create mode 100644 src/assets/d1.svg create mode 100644 src/assets/d2.svg create mode 100644 src/assets/d3.svg create mode 100644 src/assets/d4.svg create mode 100644 src/assets/d5.svg diff --git a/server.go b/server.go index cb2eba2..ce51a20 100644 --- a/server.go +++ b/server.go @@ -37,12 +37,21 @@ type ConnectionPool struct { } type MetadataConnection struct { - metadataSent bool + metadataSent bool } type MetadataConnectionPool struct { - MetadataConnectionMap map[*MetadataConnection]struct{} - mu sync.Mutex + MetadataConnectionMap map[*MetadataConnection]struct{} + mu sync.Mutex +} + +type PeopleConnection struct { + numberSent int +} + +type PeopleConnectionPool struct { + PeopleConnectionMap map[*PeopleConnection]struct{} + mu sync.Mutex } // Audio connection pool @@ -103,6 +112,24 @@ func (cp *MetadataConnectionPool) Broadcast() { } } +// People connection pool +func (cp *PeopleConnectionPool) AddConnection(connection *PeopleConnection) { + defer cp.mu.Unlock() + cp.mu.Lock() + cp.PeopleConnectionMap[connection] = struct{}{} +} + +func (cp *PeopleConnectionPool) DeleteConnection(connection *PeopleConnection) { + defer cp.mu.Unlock() + cp.mu.Lock() + delete(cp.PeopleConnectionMap, connection) +} + +func NewPeopleConnectionPool() *PeopleConnectionPool { + peopleConnectionMap := make(map[*PeopleConnection]struct{}) + return &PeopleConnectionPool{PeopleConnectionMap: peopleConnectionMap} +} + func getFileDelay(mp3FilePath string) int64 { t := 0.0 size := 0 @@ -209,6 +236,7 @@ func streamFolder(connectionPool *ConnectionPool, metadataConnectionPool *Metada func main() { connPool := NewConnectionPool() metadataConnPool := NewMetadataConnectionPool() + peopleConnPool := NewPeopleConnectionPool() music_files := []string{} @@ -235,6 +263,7 @@ func main() { http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "audio/mp3") + w.Header().Set("Cache-Control", "no-cache") w.Header().Add("Connection", "keep-alive") flusher, ok := w.(http.Flusher) @@ -268,6 +297,13 @@ func main() { connection := &MetadataConnection{metadataSent: false} metadataConnPool.AddConnection(connection) + + go func(done <-chan struct{}) { + <-done + metadataConnPool.DeleteConnection(connection) + log.Printf("%s's connection to the metadata stream has been closed\n", r.Host) + }(r.Context().Done()) + log.Printf("%s has connected to the metadata stream\n", r.Host) // Simulate sending events (you can replace this with real data) for { @@ -281,7 +317,11 @@ func main() { if err != nil { log.Fatal(err) } - fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%s", finalData)) + if _, err := fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%s", finalData)); err != nil { + metadataConnPool.DeleteConnection(connection) + log.Printf("%s's connection to the metadata stream has been closed\n", r.Host) + return + } w.(http.Flusher).Flush() connection.metadataSent = true } @@ -289,6 +329,40 @@ func main() { } }) + http.HandleFunc("/listeners", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Expose-Headers", "Content-Type") + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + connection := &PeopleConnection{numberSent: 0} + peopleConnPool.AddConnection(connection) + + go func(done <-chan struct{}) { + <-done + peopleConnPool.DeleteConnection(connection) + log.Printf("%s's connection to the listeners stream has been closed\n", r.Host) + }(r.Context().Done()) + + log.Printf("%s has connected to the listeners stream\n", r.Host) + for { + if(connection.numberSent != len(connPool.ConnectionMap)) { + connection.numberSent = len(connPool.ConnectionMap) + // n, err := fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%d", connection.numberSent)) + _, err := w.Write([]byte(fmt.Sprintf("data: %d\n\n", connection.numberSent))) + + if err != nil { + log.Fatal(err) + return + } + } + w.(http.Flusher).Flush() + time.Sleep(time.Second * 1) + } + }) + log.Println("Listening on port 8080...") log.Fatal(http.ListenAndServe(":8080", nil)) diff --git a/src/App.svelte b/src/App.svelte index 1e27a59..5e9c202 100644 --- a/src/App.svelte +++ b/src/App.svelte @@ -1,10 +1,18 @@