Skip to content

Commit

Permalink
Merge pull request #69 from singchia/feat/example-rtmp
Browse files Browse the repository at this point in the history
examples: add rtmp for realtime message proxy
  • Loading branch information
singchia committed Jun 21, 2024
2 parents 1d23b70 + aeb0cd3 commit 2f67382
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 0 deletions.
22 changes: 22 additions & 0 deletions examples/rtmp/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
PREFIX?=/usr
BINDIR?=$(PREFIX)/bin

GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)

.PHONY: all
all: rtmp_service rtmp_edge

.PHONY: clean
clean:
rm rtmp_service rtmp_edge

.PHONY: rtmp_service
rtmp_service: service/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_service service/*.go

.PHONY: rtmp_edge
rtmp_edge: edge/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_edge edge/*.go
62 changes: 62 additions & 0 deletions examples/rtmp/edge/edge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"fmt"
"io"
"net"
"sync"

"github.com/singchia/frontier/api/dataplane/v1/edge"
"github.com/spf13/pflag"
)

func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30012", "address to dial")
name := pflag.String("name", "alice", "user name to join chatroom")
listen := pflag.String("listen", "127.0.0.1:1935", "rtmp port to proxy")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
cli, err := edge.NewNoRetryEdge(dialer, edge.OptionEdgeMeta([]byte(*name)))
if err != nil {
fmt.Println("new edge err:", err)
return
}
for {
ln, err := net.Listen("tcp", *listen)
if err != nil {
return
}
for {
netconn, err := ln.Accept()
if err != nil {
fmt.Printf("accept err: %s\n", err)
break
}
go func() {
st, err := cli.OpenStream("rtmp")
if err != nil {
fmt.Printf("open stream err: %s\n", err)
return
}
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
defer wg.Done()
io.Copy(st, netconn)
netconn.Close()
st.Close()
}()
go func() {
defer wg.Done()
io.Copy(netconn, st)
netconn.Close()
st.Close()
}()
wg.Wait()
}()
}
}
}
86 changes: 86 additions & 0 deletions examples/rtmp/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"fmt"
"net"
"sync"

"github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/joy4/av/avutil"
"github.com/singchia/joy4/av/pktque"
"github.com/singchia/joy4/av/pubsub"
"github.com/singchia/joy4/format"
"github.com/singchia/joy4/format/rtmp"
"github.com/spf13/pflag"
)

func init() {
format.RegisterAll()
}

func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30011", "address to dial")
pflag.Parse()

// service
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
svc, err := service.NewService(dialer, service.OptionServiceName("rtmp"))
if err != nil {
fmt.Println("new service err:", err)
return
}
// rtmp service
rtmpserver := &rtmp.Server{}

l := &sync.RWMutex{}
type Channel struct {
que *pubsub.Queue
}
channels := map[string]*Channel{}

rtmpserver.HandlePlay = func(conn *rtmp.Conn) {
fmt.Println(conn.URL.Path)
l.RLock()
ch := channels[conn.URL.Path]
l.RUnlock()

if ch != nil {
cursor := ch.que.Latest()
filters := pktque.Filters{}

demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}

avutil.CopyFile(conn, demuxer)
}
}
rtmpserver.HandlePublish = func(conn *rtmp.Conn) {
l.Lock()
ch := channels[conn.URL.Path]
if ch == nil {
ch = &Channel{}
ch.que = pubsub.NewQueue()
channels[conn.URL.Path] = ch
} else {
ch = nil
}
l.Unlock()
if ch == nil {
return
}

avutil.CopyFile(ch.que, conn)

l.Lock()
delete(channels, conn.URL.Path)
l.Unlock()
ch.que.Close()
}

rtmpserver.Serve(svc)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/rabbitmq/amqp091-go v1.9.0
github.com/singchia/geminio v1.1.7-rc.1
github.com/singchia/go-timer/v2 v2.2.1
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6
github.com/soheilhy/cmux v0.1.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand All @@ -33,6 +34,7 @@ require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/btree v1.4.2 // indirect
github.com/tidwall/gjson v1.14.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 h1:Yp0zFEufLz0H7jzffb4UPXijavlyqlYeOg7dcyVUNnQ=
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369/go.mod h1:aFJ1ZwLjvHN4yEzE5Bkz8rD8/d8Vlj3UIuvz2yfET7I=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand All @@ -149,6 +151,8 @@ github.com/singchia/geminio v1.1.7-rc.1/go.mod h1:LkgZj4Ddja97vP7NWQk7TffFLZAosH
github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624=
github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6 h1:B9MVqDiyqKAjHmYYFNjOPYHqhml8rA1ogKs8rYTfZ00=
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6/go.mod h1:apGwjKmzM7JlKFbd/KANpq6T8Y5Ntr8Jjhq1BmKU/FA=
github.com/singchia/yafsm v1.0.1 h1:TTDSX7SBCr2YNdv/DZ76LjTer0rYwm7IPt24odNymUs=
github.com/singchia/yafsm v1.0.1/go.mod h1:fSWQl6DCzqc51DhLfwHr3gN2FhGmOEjTAQ2AOKDSBtY=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
Expand Down

0 comments on commit 2f67382

Please sign in to comment.