Feat: Added listener count

This commit is contained in:
zuma 2025-03-12 19:52:05 +01:00
parent e2d735a6e5
commit 560ea833b1
7 changed files with 227 additions and 7 deletions

View file

@ -37,12 +37,21 @@ type ConnectionPool struct {
}
type MetadataConnection struct {
metadataSent bool
metadataSent bool
}
type MetadataConnectionPool struct {
MetadataConnectionMap map[*MetadataConnection]struct{}
mu sync.Mutex
MetadataConnectionMap map[*MetadataConnection]struct{}
mu sync.Mutex
}
type PeopleConnection struct {
numberSent int
}
type PeopleConnectionPool struct {
PeopleConnectionMap map[*PeopleConnection]struct{}
mu sync.Mutex
}
// Audio connection pool
@ -103,6 +112,24 @@ func (cp *MetadataConnectionPool) Broadcast() {
}
}
// People connection pool
func (cp *PeopleConnectionPool) AddConnection(connection *PeopleConnection) {
defer cp.mu.Unlock()
cp.mu.Lock()
cp.PeopleConnectionMap[connection] = struct{}{}
}
func (cp *PeopleConnectionPool) DeleteConnection(connection *PeopleConnection) {
defer cp.mu.Unlock()
cp.mu.Lock()
delete(cp.PeopleConnectionMap, connection)
}
func NewPeopleConnectionPool() *PeopleConnectionPool {
peopleConnectionMap := make(map[*PeopleConnection]struct{})
return &PeopleConnectionPool{PeopleConnectionMap: peopleConnectionMap}
}
func getFileDelay(mp3FilePath string) int64 {
t := 0.0
size := 0
@ -209,6 +236,7 @@ func streamFolder(connectionPool *ConnectionPool, metadataConnectionPool *Metada
func main() {
connPool := NewConnectionPool()
metadataConnPool := NewMetadataConnectionPool()
peopleConnPool := NewPeopleConnectionPool()
music_files := []string{}
@ -235,6 +263,7 @@ func main() {
http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "audio/mp3")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Add("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
@ -268,6 +297,13 @@ func main() {
connection := &MetadataConnection{metadataSent: false}
metadataConnPool.AddConnection(connection)
go func(done <-chan struct{}) {
<-done
metadataConnPool.DeleteConnection(connection)
log.Printf("%s's connection to the metadata stream has been closed\n", r.Host)
}(r.Context().Done())
log.Printf("%s has connected to the metadata stream\n", r.Host)
// Simulate sending events (you can replace this with real data)
for {
@ -281,7 +317,11 @@ func main() {
if err != nil {
log.Fatal(err)
}
fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%s", finalData))
if _, err := fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%s", finalData)); err != nil {
metadataConnPool.DeleteConnection(connection)
log.Printf("%s's connection to the metadata stream has been closed\n", r.Host)
return
}
w.(http.Flusher).Flush()
connection.metadataSent = true
}
@ -289,6 +329,40 @@ func main() {
}
})
http.HandleFunc("/listeners", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Expose-Headers", "Content-Type")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
connection := &PeopleConnection{numberSent: 0}
peopleConnPool.AddConnection(connection)
go func(done <-chan struct{}) {
<-done
peopleConnPool.DeleteConnection(connection)
log.Printf("%s's connection to the listeners stream has been closed\n", r.Host)
}(r.Context().Done())
log.Printf("%s has connected to the listeners stream\n", r.Host)
for {
if(connection.numberSent != len(connPool.ConnectionMap)) {
connection.numberSent = len(connPool.ConnectionMap)
// n, err := fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%d", connection.numberSent))
_, err := w.Write([]byte(fmt.Sprintf("data: %d\n\n", connection.numberSent)))
if err != nil {
log.Fatal(err)
return
}
}
w.(http.Flusher).Flush()
time.Sleep(time.Second * 1)
}
})
log.Println("Listening on port 8080...")
log.Fatal(http.ListenAndServe(":8080", nil))