P
P
Pavel Sakharov2019-12-06 13:29:34
go
Pavel Sakharov, 2019-12-06 13:29:34

Does RabbitMQ RPC work only 3 - 4 times after which the consumers fall off?

Good day. Faced a problem, and I can not find any logical explanation.
I implement RPC through RabbitMQ on Go. The scheme is as follows:
Client:
1. Forms a request to the "RPC" persistent queue
2. Sets a unique CorrelationId in the message
3. Listens to the "RPC Answers" persistent queue
4. When receiving a message, checks if the CorrelationId matches the previously specified
one 5. If the CorrelationId matches, sends an ACK, closes channel, and connection.
Server
1. Listens to the "RPC" queue
2. When receiving a message, makes an ACK
3. Forms a response to the message and sends RPC Answers to the queue with the CorrelationId set to CorrelationId in the received message.
The scheme is fully operational, all data passes correctly, but during the test, there are no more than 3 - 4 such cycles, after which the client freezes waiting for a response, and the server falls off as a consumer after some time.
Are there any thoughts?
Below is the top-level code of the main logic
Client:

func main() {


  for {
    mq, err := rMQ.NewEngine(os.Getenv("MQ_HOST"), os.Getenv("MQ_USER"), os.Getenv("MQ_PASS"), os.Getenv("MQ_PORT"))

    if err != nil {
      log.Fatal(err)
    }

    command := Types.PRCCommand{Action: "LoadIpLists", Params: ""}

    result, err := mq.RPC(command.ToJson(), "agent")
    mq.Close()
    if err != nil {
      log.Println(err)
    }
    Res := struct {
      Text Types.IpListsDump
    }{}
    err = json.Unmarshal(result, &Res)
    if err != nil {
      log.Println(err)
    }

    _, _ = pp.Println(Res)
    time.Sleep(1 * time.Second)

  }

}

func (engine Engine) RPC(body []byte, agent string) ([]byte, error) {

  rpcID := time.Now().String()
  ch, err := engine.Connection.Channel()

  if err != nil {
    return nil, err
  }

  ReplayTo, err := ch.QueueDeclare("RPC_ANSWERS", false, false, false, false, nil)
  if err != nil {
    return nil, err
  }

  err = ch.Publish("", "RPC", false, false, amqp.Publishing{
    ContentType:   "application/json",
    Body:          body,
    ReplyTo:       ReplayTo.Name,
    CorrelationId: rpcID,
  })

  if err != nil {

    return nil, err
  }

  res, err := ch.Consume(ReplayTo.Name, agent, false, false, false, false, nil)

  if err != nil {
    log.Println(err)
  }
  for msg := range res {
    if msg.CorrelationId == rpcID {
      err := msg.Ack(true)
      if err != nil {
        log.Println(err)
      }

      err = ch.Close()
      if err != nil {
        log.Println(err)
      }

      return msg.Body, err
    }
  }
  err = ch.Close()
  if err != nil {
    log.Println(err)
  }

  return nil, nil
}

Server
func main{
       mq, error := rMQ.NewEngine(os.Getenv("MQ_HOST"), os.Getenv("MQ_USER"), os.Getenv("MQ_PASS"), os.Getenv("MQ_PORT"))
  if error != nil {
    log.Fatal(error)
  }
  go mq.ListenSourceMessage("RPC", false, app.HandlePRCRequest)
  for {
    _ = true
  }
}

func (engine *Engine) ListenSourceMessage(s string, exclusive bool, Func func(msg amqp.Delivery, connection *amqp.Connection)) {

  conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%[email protected]%s:%s/", engine.User, engine.Pass, engine.Host, engine.Port))
  fatalOnError(err)

  defer conn.Close()

  ch, err := conn.Channel()
  fatalOnError(err)
  _, err = ch.QueueDeclare(s, true, false, false, false, nil)
  fatalOnError(err)

  msgs, err := ch.Consume(s, "RootServer", true, exclusive, false, false, nil)

  if err != nil {
    log.Println(err)
  }
  for d := range msgs {
    Func(d, conn)
  }
}

func (command *PRCCommand) SendAnswer(msg amqp.Delivery, conn *amqp.Connection) {

  channel, e := conn.Channel()
  if e != nil {
    log.Println(e)
  }
  defer channel.Close()

  jsonbleCommand, e := command.Execute(msg)
  if e != nil {
    log.Println(e.Error())
    bytes, _ := json.Marshal(e)
    err := channel.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
      Body:          bytes,
      Type:          "application/json",
      CorrelationId: msg.CorrelationId,
    })
    if err != nil {
      log.Println(err)
    }
    return
  }
  e = channel.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
    Body: jsonbleCommand.ToJson(),
    Type: "application/json",
    CorrelationId: msg.CorrelationId,
  })
  if e != nil {
    log.Println(e)
  }
}

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question