N
N
nakem2021-10-31 15:41:43
go
nakem, 2021-10-31 15:41:43

How to understand the reason for the canceled context?

I wrote an http handler that accepts a request from a WAV. My task is to transfer the audio in the stream to the backend in the format where it is decoded into psm, that is, the waf header is simply removed and resampled. Next, I need to accept the response from the backend in the form of a channel, resample it in the original sample rate and attach the waf header. And send http response.
I wrote tests with mocks to check that a request is sent to the backend. And the response is received from the backend.
I have a problem with pipelines. Goroutines return a context canceled error. That is, as I understand it, this is not DeadlineExceed. This is exactly where the context is cancelled. I don't do it anywhere. How can I determine where this is happening?
I've tried increasing the timeouts but that doesn't help.
This is my mock test.

const (
    serveEndTimeout = time.Millisecond * 300
)

type errResponse struct {
    Code    int    `json:"status"`
    Message string `json:"message"`
}

func TestHandler_handleDenoise(t *testing.T) {
    log.Init(log.Debug)
    logger := log.Get()
    mtr := &metrics.Metrics{}
    mtr.Init(nil)

    tests := []struct {
        name        string
        cfg         *api.Config
        contentType string
        userID      string
        requestID   string
        audioSource io.Reader

        expectedBackendRequest *denoiser.Request
        backendResponses       []*denoiser.Response

        expectedStatusCode int
        expectedErrResp    string
    }{
        {
            name: "correct request",
            cfg: &api.Config{
                ReadTimeout:        types.NewDuration("1s"),
                WriteTimeout:       types.NewDuration("1s"),
                HTTPRequestMaxSize: types.NewUnit("1MiB"),
            },
            contentType:        "audio/x-wav",
            userID:             "user",
            requestID:          "bar",
            audioSource:        bytes.NewReader(append(audio.GenerateHeaderWav(audio.WavPCM, 1, 14000, 2, 16), []byte("1234567")...)),
            expectedStatusCode: http.StatusOK,
            expectedBackendRequest: &denoiser.Request{
                RequestID:        "bar",
                AudioEncoding:    audio.PCM_s16le,
                SampleRate:       14000,
                TargetSampleRate: 48000,
            },
            backendResponses: []*denoiser.Response{
                {Audio: []byte("1234567")},
            },
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            ctx := context.TODO()
            serveHTTPDone := make(chan struct{})

            denoiserMock := makeMockDenoiser(ctx, tt.expectedBackendRequest, tt.backendResponses)
            a := acl.New(&acl.Config{Users: map[string]acl.UserConfig{
                "user":        {},
                "userBlocked": {},
            }})
            h := NewHandler(tt.cfg, logger, mtr, denoiserMock, a)
            serv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                r = r.WithContext(util.SetRequestUserID(r.Context(), tt.userID))
                r = r.WithContext(util.SetRequestID(r.Context(), tt.requestID))
                h.ServeHTTP(w, r)
                close(serveHTTPDone)
            }))
            defer serv.Close()

            u, err := url.Parse(serv.URL)
            require.Nil(t, err)
            u.Path = httpPathPattern
            resp, err := http.Post(u.String(), tt.contentType, tt.audioSource)
            require.Nil(t, err)
            defer func() {
                assert.Nil(t, resp.Body.Close())
            }()

            require.Equal(t, tt.expectedStatusCode, resp.StatusCode)
            if tt.expectedErrResp != "" {
                var errResp errResponse
                decoder := json.NewDecoder(resp.Body)
                if err := decoder.Decode(&errResp); err != nil && err != io.EOF {
                    t.Errorf("parsing response")
                }
                fmt.Printf("%s\n\n", errResp.Message)
                require.Equal(t, tt.expectedErrResp, errResp.Message)
                require.Equal(t, tt.expectedStatusCode, errResp.Code)
            }

            select {
            case <-serveHTTPDone:
            case <-time.After(serveEndTimeout):
                t.Fatal("client request serving didn't end")
            }
        })
    }
}

func newResponsesChannel(ctx context.Context, responses []*denoiser.Response) <-chan *denoiser.Response {
    ch := make(chan *denoiser.Response)
    go func() {
        defer close(ch)
        for _, response := range responses {
            select {
            case <-ctx.Done():
                fmt.Printf("new Responce context\n")
                return
            case ch <- response:
            }
            // time.Sleep(30 * time.Millisecond)
        }
    }()

    return ch
}

func makeMockDenoiser(ctx context.Context, expectedRequest *denoiser.Request, responses []*denoiser.Response) *mockDenoiser {
    m := &mockDenoiser{}
    if expectedRequest != nil {
        m.On("Denoise", mock.Anything, mock.MatchedBy(func(actualRequest *denoiser.Request) bool {
            return *expectedRequest == *actualRequest
        }), mock.Anything).Return(newResponsesChannel(ctx, responses)).Once()
    }
    return m
}


This is my handler

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    ctx = log.ExtendCtx(ctx, "handler", "denoise")

    (hidden block)

    h.logger.Infof(ctx, "Handling request: %+v", req)

    g, gCtx := errgroup.WithContext(ctx)
    audioChunks := make(chan []byte)
    responsesCh := denoise(ctx, h.backend.Denoise(gCtx, req, audioChunks), req)

    var speechSize int
    g.Go(func() error {
        defer close(audioChunks)
        var err error
        speechSize, err = h.processIncomingAudioChunks(gCtx, r, audioChunks)
        return err
    })
    g.Go(func() error {
        return h.processBackendResponses(gCtx, w, responsesCh)
    })
    err = g.Wait() // err always "context cancelled"
    if err != nil {
        h.handleDenoiseError(ctx, err, w)
        return
    } else {
        h.logger.Info(ctx, "Finished request successfully")
        h.metrics.SuccessfulRequests.With(h.metricLabels).Inc()
    }

    billingCtx := util.ContextWithDenoiseBilling(ctx, speechSize)
    h.logger.Info(billingCtx, "Billing info")
}


This is denoise() Where the response is received from the backend and the response is resampled to the original sample rate and the header is attached

// resemple to source sampleRate and add WAV header.
func denoise( //nolint: funlen, gocognit
  ctx context.Context, resp <-chan *denoiser.Response, req *denoiser.Request,
) <-chan *denoiser.Response {
  response := make(chan *denoiser.Response)

  decodedAudio := make(chan []byte)
  rawData := make(chan []byte)
  ok := true
  eg, gCtx := errgroup.WithContext(ctx)

  eg.Go(func() error {
    decodedResults := audio.Decode(
      gCtx,
      rawData,
      audio.FromEncoding(audio.PCM_s16le),
      audio.FromSampleRate(denoiser.NativeSampleRate),
      audio.ToSampleRate(req.SampleRate),
      audio.TargetChannels(1),
    )
    var (
      res *audio.Result
      ok  bool
    )

    for {
      select {
      case <-gCtx.Done():
        return gCtx.Err()
      case res, ok = <-decodedResults:
      }

      if !ok {
        break
      }

      if res.Err != nil {
        return &denoiser.AudioDecoderError{}
      }

      select {
      case <-gCtx.Done():
        return gCtx.Err()
      case decodedAudio <- res.Data[0]:
      }
    }
    close(decodedAudio)
    return nil
  })

  data := &denoiser.Response{}
  eg.Go(func() error {
    for {
      data = &denoiser.Response{}
      select {
      case <-gCtx.Done():
        return gCtx.Err()
      case data, ok = <-resp:
      }
      if ok {
        rawData <- data.Audio
      } else {
        close(rawData)
        return nil
      }
    }
  })
  eg.Go(func() error {
    chunk := new(denoiser.Response)

    // header generating
    chunk.Audio = audio.GenerateHeaderWav(audio.WavPCM, 1, uint32(req.SampleRate), 2, 16)
    response <- chunk

    for {
      chunk = new(denoiser.Response)
      select {
      case <-gCtx.Done():
        return gCtx.Err()
      case chunk.Audio, ok = <-decodedAudio:
      }
      if !ok {
        return nil
      }
      response <- chunk
    }
  })

  go func() {
    if err := eg.Wait(); err != nil { // !
      select {
      case <-ctx.Done():
      case response <- &denoiser.Response{Err: err}:
      }
      return
    }
    close(response)
  }()
  return response
}


Here is my processIncomingAudioChunks()
func (h *Handler) processIncomingAudioChunks(
  ctx context.Context, r *http.Request, audio chan<- []byte,
) (int, error) {
  var err error
  n := 0
  speechSize := 0

  for {
    audioChunk := make([]byte, chunkSize)
    err = contextutil.DoWithTimeout(ctx, func() error {
      var err error
      n, err = r.Body.Read(audioChunk)
      return err
    }, h.cfg.ReadTimeout.ToNative())

    if err == io.EOF {
      if n == 0 {
        return speechSize, nil
      }
    } else if err != nil {
      return speechSize, err
    }

    if n > 0 {
      select {
      case <-ctx.Done():
        return speechSize, ctx.Err()
      case audio <- audioChunk[:n]:
        speechSize += n
      }
    }
  }
}


I'm stuck with this problem

Answer the question

In order to leave comments, you need to log in

1 answer(s)
N
nakem, 2021-11-01
@nakem

Solved a problem! It's all about the auto-generated mock. The `h.backend.Denoise()` mock did not read from the audioChunks channel. Therefore, there was a deadlock and the corresponding cancellation of the context. And if you add a buffer to the channel, then the program did not get stuck. For me, the main question was why the program returned StatusOK if there were problems with reading from the channel, but I forgot that I myself added the return of slice bytes from the mock, so the program worked. Thanks to everyone who wanted to help!

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question