Answer the question
In order to leave comments, you need to log in
How to understand the reason for the canceled context?
I wrote an http handler that accepts a request from a WAV. My task is to transfer the audio in the stream to the backend in the format where it is decoded into psm, that is, the waf header is simply removed and resampled. Next, I need to accept the response from the backend in the form of a channel, resample it in the original sample rate and attach the waf header. And send http response.
I wrote tests with mocks to check that a request is sent to the backend. And the response is received from the backend.
I have a problem with pipelines. Goroutines return a context canceled error. That is, as I understand it, this is not DeadlineExceed. This is exactly where the context is cancelled. I don't do it anywhere. How can I determine where this is happening?
I've tried increasing the timeouts but that doesn't help.
This is my mock test.
const (
serveEndTimeout = time.Millisecond * 300
)
type errResponse struct {
Code int `json:"status"`
Message string `json:"message"`
}
func TestHandler_handleDenoise(t *testing.T) {
log.Init(log.Debug)
logger := log.Get()
mtr := &metrics.Metrics{}
mtr.Init(nil)
tests := []struct {
name string
cfg *api.Config
contentType string
userID string
requestID string
audioSource io.Reader
expectedBackendRequest *denoiser.Request
backendResponses []*denoiser.Response
expectedStatusCode int
expectedErrResp string
}{
{
name: "correct request",
cfg: &api.Config{
ReadTimeout: types.NewDuration("1s"),
WriteTimeout: types.NewDuration("1s"),
HTTPRequestMaxSize: types.NewUnit("1MiB"),
},
contentType: "audio/x-wav",
userID: "user",
requestID: "bar",
audioSource: bytes.NewReader(append(audio.GenerateHeaderWav(audio.WavPCM, 1, 14000, 2, 16), []byte("1234567")...)),
expectedStatusCode: http.StatusOK,
expectedBackendRequest: &denoiser.Request{
RequestID: "bar",
AudioEncoding: audio.PCM_s16le,
SampleRate: 14000,
TargetSampleRate: 48000,
},
backendResponses: []*denoiser.Response{
{Audio: []byte("1234567")},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.TODO()
serveHTTPDone := make(chan struct{})
denoiserMock := makeMockDenoiser(ctx, tt.expectedBackendRequest, tt.backendResponses)
a := acl.New(&acl.Config{Users: map[string]acl.UserConfig{
"user": {},
"userBlocked": {},
}})
h := NewHandler(tt.cfg, logger, mtr, denoiserMock, a)
serv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(util.SetRequestUserID(r.Context(), tt.userID))
r = r.WithContext(util.SetRequestID(r.Context(), tt.requestID))
h.ServeHTTP(w, r)
close(serveHTTPDone)
}))
defer serv.Close()
u, err := url.Parse(serv.URL)
require.Nil(t, err)
u.Path = httpPathPattern
resp, err := http.Post(u.String(), tt.contentType, tt.audioSource)
require.Nil(t, err)
defer func() {
assert.Nil(t, resp.Body.Close())
}()
require.Equal(t, tt.expectedStatusCode, resp.StatusCode)
if tt.expectedErrResp != "" {
var errResp errResponse
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&errResp); err != nil && err != io.EOF {
t.Errorf("parsing response")
}
fmt.Printf("%s\n\n", errResp.Message)
require.Equal(t, tt.expectedErrResp, errResp.Message)
require.Equal(t, tt.expectedStatusCode, errResp.Code)
}
select {
case <-serveHTTPDone:
case <-time.After(serveEndTimeout):
t.Fatal("client request serving didn't end")
}
})
}
}
func newResponsesChannel(ctx context.Context, responses []*denoiser.Response) <-chan *denoiser.Response {
ch := make(chan *denoiser.Response)
go func() {
defer close(ch)
for _, response := range responses {
select {
case <-ctx.Done():
fmt.Printf("new Responce context\n")
return
case ch <- response:
}
// time.Sleep(30 * time.Millisecond)
}
}()
return ch
}
func makeMockDenoiser(ctx context.Context, expectedRequest *denoiser.Request, responses []*denoiser.Response) *mockDenoiser {
m := &mockDenoiser{}
if expectedRequest != nil {
m.On("Denoise", mock.Anything, mock.MatchedBy(func(actualRequest *denoiser.Request) bool {
return *expectedRequest == *actualRequest
}), mock.Anything).Return(newResponsesChannel(ctx, responses)).Once()
}
return m
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = log.ExtendCtx(ctx, "handler", "denoise")
(hidden block)
h.logger.Infof(ctx, "Handling request: %+v", req)
g, gCtx := errgroup.WithContext(ctx)
audioChunks := make(chan []byte)
responsesCh := denoise(ctx, h.backend.Denoise(gCtx, req, audioChunks), req)
var speechSize int
g.Go(func() error {
defer close(audioChunks)
var err error
speechSize, err = h.processIncomingAudioChunks(gCtx, r, audioChunks)
return err
})
g.Go(func() error {
return h.processBackendResponses(gCtx, w, responsesCh)
})
err = g.Wait() // err always "context cancelled"
if err != nil {
h.handleDenoiseError(ctx, err, w)
return
} else {
h.logger.Info(ctx, "Finished request successfully")
h.metrics.SuccessfulRequests.With(h.metricLabels).Inc()
}
billingCtx := util.ContextWithDenoiseBilling(ctx, speechSize)
h.logger.Info(billingCtx, "Billing info")
}
// resemple to source sampleRate and add WAV header.
func denoise( //nolint: funlen, gocognit
ctx context.Context, resp <-chan *denoiser.Response, req *denoiser.Request,
) <-chan *denoiser.Response {
response := make(chan *denoiser.Response)
decodedAudio := make(chan []byte)
rawData := make(chan []byte)
ok := true
eg, gCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
decodedResults := audio.Decode(
gCtx,
rawData,
audio.FromEncoding(audio.PCM_s16le),
audio.FromSampleRate(denoiser.NativeSampleRate),
audio.ToSampleRate(req.SampleRate),
audio.TargetChannels(1),
)
var (
res *audio.Result
ok bool
)
for {
select {
case <-gCtx.Done():
return gCtx.Err()
case res, ok = <-decodedResults:
}
if !ok {
break
}
if res.Err != nil {
return &denoiser.AudioDecoderError{}
}
select {
case <-gCtx.Done():
return gCtx.Err()
case decodedAudio <- res.Data[0]:
}
}
close(decodedAudio)
return nil
})
data := &denoiser.Response{}
eg.Go(func() error {
for {
data = &denoiser.Response{}
select {
case <-gCtx.Done():
return gCtx.Err()
case data, ok = <-resp:
}
if ok {
rawData <- data.Audio
} else {
close(rawData)
return nil
}
}
})
eg.Go(func() error {
chunk := new(denoiser.Response)
// header generating
chunk.Audio = audio.GenerateHeaderWav(audio.WavPCM, 1, uint32(req.SampleRate), 2, 16)
response <- chunk
for {
chunk = new(denoiser.Response)
select {
case <-gCtx.Done():
return gCtx.Err()
case chunk.Audio, ok = <-decodedAudio:
}
if !ok {
return nil
}
response <- chunk
}
})
go func() {
if err := eg.Wait(); err != nil { // !
select {
case <-ctx.Done():
case response <- &denoiser.Response{Err: err}:
}
return
}
close(response)
}()
return response
}
func (h *Handler) processIncomingAudioChunks(
ctx context.Context, r *http.Request, audio chan<- []byte,
) (int, error) {
var err error
n := 0
speechSize := 0
for {
audioChunk := make([]byte, chunkSize)
err = contextutil.DoWithTimeout(ctx, func() error {
var err error
n, err = r.Body.Read(audioChunk)
return err
}, h.cfg.ReadTimeout.ToNative())
if err == io.EOF {
if n == 0 {
return speechSize, nil
}
} else if err != nil {
return speechSize, err
}
if n > 0 {
select {
case <-ctx.Done():
return speechSize, ctx.Err()
case audio <- audioChunk[:n]:
speechSize += n
}
}
}
}
Answer the question
In order to leave comments, you need to log in
Solved a problem! It's all about the auto-generated mock. The `h.backend.Denoise()` mock did not read from the audioChunks channel. Therefore, there was a deadlock and the corresponding cancellation of the context. And if you add a buffer to the channel, then the program did not get stuck. For me, the main question was why the program returned StatusOK if there were problems with reading from the channel, but I forgot that I myself added the return of slice bytes from the mock, so the program worked. Thanks to everyone who wanted to help!
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question