Answer the question
In order to leave comments, you need to log in
How to more correctly determine the session of a UDP connection?
Listening to messages from the udp socket, I would like to somehow determine where the packets come from and scatter them across sessions in order to get a more detailed report on the incoming data, I just did it head-on, searching for the current session and writing to its channel, I would like to know if there is more elegant methods ?
func serve(ctx context.Context, addr string, port int) <-chan Message {
type session struct {
conn *net.UDPAddr
expiration int64
buffer chan []byte
run func()
}
var (
out = make(chan Message, 64)
done = make(chan error, 1)
wg sync.WaitGroup
localAddres = &net.UDPAddr{IP: net.ParseIP(addr), Port: port}
bufPool = sync.Pool{New: func() interface{} { return make([]byte, bufferSize) }}
sessions []*session
// TODO может будет не линейный поиск, пока посмотрим так
getSession = func(addr *net.UDPAddr) (*session, bool) {
for _, s := range sessions {
if reflect.DeepEqual(s.conn, addr) {
return s, true
}
}
return nil, false
}
addSession = func(s *session) {
sessions = append(sessions, s)
}
removeSession = func(sess *session) {
for i, s := range sessions {
if s.id == sess.id {
sessions = sessions[:i+copy(sessions[i:], sessions[i+1:])]
}
}
}
gc = func() {
for {
<-time.After(time.Duration(3 * time.Second))
for _, s := range sessions {
if time.Now().UnixNano() > s.expiration && s.expiration > 0 {
removeSession(s)
}
}
}
}
)
go gc()
go func() {
pc, err := net.ListenUDP("udp", localAddres)
if err != nil {
done <- err
}
defer pc.Close()
go func() {
for {
buff := bufPool.Get().([]byte)
size, addr, err := pc.ReadFromUDP(buff[0:])
if err != nil {
done <- err
return
}
switch s, ok := getSession(addr); ok {
case true:
s.buffer <- buff[0:size]
bufPool.Put(buff)
s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
atomic.AddInt64(&s.countMessage, 1)
atomic.AddInt64(&s.len, int64(size))
case false:
s := &session{
id: rand.Int(),
conn: addr,
expiration: time.Now().UnixNano(),
buffer: make(chan []byte, 64),
run: func() {//working},
}
wg.Add(1)
go s.run()
addSession(s)
s.buffer <- buff[0:size]
bufPool.Put(buff)
}
}
}()
select {
case <-ctx.Done():
wg.Wait()
err = ctx.Err()
case err = <-done:
panic(err)
}
}()
return out
}
Answer the question
In order to leave comments, you need to log in
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question