diff --git a/stream.go b/stream.go index 01e66c1ed88f..aa693f356470 100644 --- a/stream.go +++ b/stream.go @@ -542,6 +542,8 @@ type clientStream struct { sentLast bool // sent an end stream + recvMsg bool // received msg from server + methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1134,11 +1136,17 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } + if cs.desc.ClientStreams && !cs.desc.ServerStreams && !cs.recvMsg{ + return status.Errorf(codes.Internal, "client streaming cardinality violation") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + if cs.desc.ClientStreams { + cs.recvMsg = true + } if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index a425877155e8..5015dd67d595 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3588,9 +3588,6 @@ func testClientStreamingError(t *testing.T, e env) { // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server doesn't send a message before returning status OK. func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { - // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` - // after this is fixed. - t.Skip() ss := &stubserver.StubServer{ StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { // Returning status OK without sending a response message.This is a