R
R
Romses Panagiotis2021-06-09 12:54:51
go
Romses Panagiotis, 2021-06-09 12:54:51

How to correctly limit the RPC call by timeout?

An RPC call is used via AMQP (adapted from the example in rpc_client.go): after sending a message, we listen, waiting for a message with a given CorrelationId.

Original code snippet

resp, err := srv.RPCClient.Call(topic, payload)
if err != nil {
  srv.logger.Error("RPC call error. ", err)
  return err
}


func (c *AMQPClient) Call(topic string, body []byte) (response []byte, err error) {
  // ...
  // publish message with CorrelationId
  // ...

  // Consume messages in loop
  for msg := range msgs {
    if corrID == msg.CorrelationId {
      return msg.Body, nil
    }
  }

  err = errors.New("lost CorrelationId=" + corrID)
  c.logger.Error(err)

  return nil, err
}



Now we need to add a call operation timeout. I decided that I needed to add a context inside the Call.
I do so

ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
defer cancel()

resp, err := srv.RPCClient.Call(ctx, topic, payload)
if err != nil { // должен вернуть ошибку, если таймаут
  srv.logger.Error("RPC call error. ", err)
  return err
}


func (c *AMQPClient) Call(ctx context.Context, topic string, body []byte) (response []byte, err error) {
  // ...
  // publish message with CorrelationId
  // ...

  // Consume messages in loop
  /*
    for msg := range msgs {
      if corrID == msg.CorrelationId {
        return msg.Body, nil
      }
    }
  */

  for {
    select {
    case msg := <-msgs:
      if corrID == msg.CorrelationId {
        return msg.Body, nil
      }
    case <-ctx.Done():
      return nil, ctx.Err()
    }
  }

  err = errors.New("lost CorrelationId=" + corrID) //  на этой строчке теперь сообщает: unreachable code
  c.logger.Error(err)

  return nil, err
}


How to do it right, getting rid of "unreachable code"?

Answer the question

In order to leave comments, you need to log in

2 answer(s)
A
Alexander Pavlyuk, 2021-06-09
@romesses

unreachable code can be safely removed
. You already return an error if the context is canceled:

case <-ctx.Done():
      return nil, ctx.Err()

But you need to understand that the context will be canceled by timeout only if this timeout is set for the context.
For example:
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := srv.RPCClient.Call(ctx, topic, payload)
if err != nil { // тут вернет ошибку "context cancelled" при таймауте
  srv.logger.Error("RPC call error. ", err)
  return err
}

D
Dmitry Shitskov, 2021-06-09
@Zarom

You can change the code, for example, in this way. And all that after the cycle - to remove. You now have the only way to leave the loop - return with the end of the current function, so all the code after the loop will never be executed

case <-ctx.Done():
  return nil, ctx.Err()
}

case <-ctx.Done():
  err = fmt.Errorf("lost CorrelationId=%s %s", corrID, err.Error())
  c.logger.Error(err)
  return nil, ctx.Err()
}

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question