mentalaradio/server.go
2025-03-12 17:44:13 +01:00

295 lines
6.8 KiB
Go

package main
import (
"bytes"
"io"
"log"
"fmt"
"net/http"
"os"
"encoding/base64"
"encoding/json"
"sync"
"time"
"path/filepath"
"github.com/dhowden/tag"
"github.com/tcolgate/mp3"
"math/rand"
"math"
)
const (
BUFFERSIZE = 8192
)
var artworkAsUrl = ""
var title = ""
var url = ""
type Connection struct {
bufferChannel chan []byte
buffer []byte
}
type ConnectionPool struct {
ConnectionMap map[*Connection]struct{}
mu sync.Mutex
}
type MetadataConnection struct {
metadataSent bool
}
type MetadataConnectionPool struct {
MetadataConnectionMap map[*MetadataConnection]struct{}
mu sync.Mutex
}
// Audio connection pool
func (cp *ConnectionPool) AddConnection(connection *Connection) {
defer cp.mu.Unlock()
cp.mu.Lock()
cp.ConnectionMap[connection] = struct{}{}
}
func (cp *ConnectionPool) DeleteConnection(connection *Connection) {
defer cp.mu.Unlock()
cp.mu.Lock()
delete(cp.ConnectionMap, connection)
}
func NewConnectionPool() *ConnectionPool {
connectionMap := make(map[*Connection]struct{})
return &ConnectionPool{ConnectionMap: connectionMap}
}
func (cp *ConnectionPool) Broadcast(buffer []byte) {
defer cp.mu.Unlock()
cp.mu.Lock()
for connection := range cp.ConnectionMap {
copy(connection.buffer, buffer)
select {
case connection.bufferChannel <- connection.buffer:
default:
}
}
}
// Metadata connection pool
func (cp *MetadataConnectionPool) AddConnection(connection *MetadataConnection) {
defer cp.mu.Unlock()
cp.mu.Lock()
cp.MetadataConnectionMap[connection] = struct{}{}
}
func (cp *MetadataConnectionPool) DeleteConnection(connection *MetadataConnection) {
defer cp.mu.Unlock()
cp.mu.Lock()
delete(cp.MetadataConnectionMap, connection)
}
func NewMetadataConnectionPool() *MetadataConnectionPool {
metadataConnectionMap := make(map[*MetadataConnection]struct{})
return &MetadataConnectionPool{MetadataConnectionMap: metadataConnectionMap}
}
func (cp *MetadataConnectionPool) Broadcast() {
defer cp.mu.Unlock()
cp.mu.Lock()
for connection := range cp.MetadataConnectionMap {
connection.metadataSent = false
}
}
func getFileDelay(mp3FilePath string) int64 {
t := 0.0
size := 0
file, err := os.Open(mp3FilePath)
if err != nil {
log.Fatal(err)
}
d := mp3.NewDecoder(file)
var f mp3.Frame
skipped := 0
for {
if err := d.Decode(&f, &skipped); err != nil {
if err == io.EOF {
break
}
log.Println(err)
return 0
}
t = t + f.Duration().Seconds()
size = size + f.Size()
}
track_duration := 1000 * t
delayVal := int64(math.Floor(track_duration)) * BUFFERSIZE / int64(size)
log.Print(delayVal)
return delayVal
}
func streamFolder(connectionPool *ConnectionPool, metadataConnectionPool *MetadataConnectionPool, list []string) {
buffer := make([]byte, BUFFERSIZE)
for _, music := range list {
file, err := os.Open(filepath.Join("./music/", music))
if err != nil {
log.Fatal(err)
}
m, err := tag.ReadFrom(file)
if err != nil {
log.Fatal(err)
}
title = m.Title()
log.Print(title)
rawData := m.Raw()
for _, v := range rawData {
if _, isTagPicture := v.(*tag.Picture); isTagPicture {
artworkMIMEType := v.(*tag.Picture).MIMEType
artworkAsUrl = fmt.Sprintf("data:%s;base64,%s",artworkMIMEType, base64.StdEncoding.EncodeToString(m.Picture().Data))
}
if _, isTagComm := v.(*tag.Comm); isTagComm {
entryDescription := v.(*tag.Comm).Description
if entryDescription == "purl" {
url = v.(*tag.Comm).Text
}
}
}
log.Print(url)
metadataConnectionPool.Broadcast()
ctn, err := io.ReadAll(file)
if err != nil {
log.Fatal(err)
}
// clear() is a new builtin function introduced in go 1.21. Just reinitialize the buffer if on a lower version.
clear(buffer)
tempfile := bytes.NewReader(ctn)
delay := getFileDelay(filepath.Join("./music/", music))
ticker := time.NewTicker(time.Millisecond * time.Duration(delay))
// Send one buffer in advance to avoid client choking
for range ticker.C {
_, err := tempfile.Read(buffer)
if err == io.EOF {
ticker.Stop()
break
}
connectionPool.Broadcast(buffer)
}
}
}
func main() {
connPool := NewConnectionPool()
metadataConnPool := NewMetadataConnectionPool()
music_files := []string{}
entries, err := os.ReadDir("./music")
if err != nil {
log.Fatal(err)
}
for _, e := range entries {
if filepath.Ext(e.Name()) == ".mp3" {
music_files = append(music_files, e.Name())
}
}
log.Printf("%d Music file found.", len(music_files))
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(music_files), func(i, j int) { music_files[i], music_files[j] = music_files[j], music_files[i] })
go streamFolder(connPool, metadataConnPool, music_files)
http.Handle("/", http.FileServer(http.Dir("./dist")))
http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "audio/mp3")
w.Header().Add("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
log.Println("Could not create flusher")
}
connection := &Connection{bufferChannel: make(chan []byte), buffer: make([]byte, BUFFERSIZE)}
connPool.AddConnection(connection)
log.Printf("%s has connected to the audio stream\n", r.Host)
for {
buf := <-connection.bufferChannel
if _, err := w.Write(buf); err != nil {
connPool.DeleteConnection(connection)
log.Printf("%s's connection to the audio stream has been closed\n", r.Host)
return
}
flusher.Flush()
clear(connection.buffer)
}
})
http.HandleFunc("/metadata", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Expose-Headers", "Content-Type")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
connection := &MetadataConnection{metadataSent: false}
metadataConnPool.AddConnection(connection)
log.Printf("%s has connected to the metadata stream\n", r.Host)
// Simulate sending events (you can replace this with real data)
for {
if(connection.metadataSent == false) {
data := map[string]string{ "title":"", "url":"", "artwork":"" }
data["title"] = title
data["url"] = url
data["artwork"] = artworkAsUrl
finalData, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("%s", finalData))
w.(http.Flusher).Flush()
connection.metadataSent = true
}
time.Sleep(time.Second * 1)
}
})
log.Println("Listening on port 8080...")
log.Fatal(http.ListenAndServe(":8080", nil))
}