J
J
jajabin2020-04-03 00:23:23
go
jajabin, 2020-04-03 00:23:23

How to more correctly determine the session of a UDP connection?

Listening to messages from the udp socket, I would like to somehow determine where the packets come from and scatter them across sessions in order to get a more detailed report on the incoming data, I just did it head-on, searching for the current session and writing to its channel, I would like to know if there is more elegant methods ?

func serve(ctx context.Context, addr string, port int) <-chan Message {
  type session struct {
    conn         *net.UDPAddr
    expiration   int64
    buffer       chan []byte
    run          func()
  }

  var (
    out         = make(chan Message, 64)
    done        = make(chan error, 1)
    wg          sync.WaitGroup
    localAddres = &net.UDPAddr{IP: net.ParseIP(addr), Port: port}
    bufPool     = sync.Pool{New: func() interface{} { return make([]byte, bufferSize) }}
    sessions    []*session

    // TODO может будет не линейный поиск, пока посмотрим так
    getSession = func(addr *net.UDPAddr) (*session, bool) {
      for _, s := range sessions {
        if reflect.DeepEqual(s.conn, addr) {
          return s, true
        }
      }
      return nil, false
    }
    addSession = func(s *session) {
      sessions = append(sessions, s)
    }
    removeSession = func(sess *session) {
      for i, s := range sessions {
        if s.id == sess.id {
          sessions = sessions[:i+copy(sessions[i:], sessions[i+1:])]
        }
      }
    }
    gc = func() {
      for {
        <-time.After(time.Duration(3 * time.Second))
        for _, s := range sessions {
          if time.Now().UnixNano() > s.expiration && s.expiration > 0 {
            removeSession(s)
          }
        }
      }
    }
  )
  go gc()
  go func() {
    pc, err := net.ListenUDP("udp", localAddres)
    if err != nil {
      done <- err
    }
    defer pc.Close()
    go func() {
      for {
        buff := bufPool.Get().([]byte)
        size, addr, err := pc.ReadFromUDP(buff[0:])
        if err != nil {
          done <- err
          return
        }
        switch s, ok := getSession(addr); ok {
        case true:
          s.buffer <- buff[0:size]
          bufPool.Put(buff)
          s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
          atomic.AddInt64(&s.countMessage, 1)
          atomic.AddInt64(&s.len, int64(size))
        case false:
          s := &session{
            id:         rand.Int(),
            conn:       addr,
            expiration: time.Now().UnixNano(),
            buffer:     make(chan []byte, 64),
            run: func() {//working},
          }
          wg.Add(1)
          go s.run()
          addSession(s)
          s.buffer <- buff[0:size]
          bufPool.Put(buff)
          
        }
      }
    }()
    select {
    case <-ctx.Done():
      wg.Wait()
      err = ctx.Err()
    case err = <-done:
      panic(err)
    }
  }()

  return out
}

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question