Skip to content

internal/delegatingresolver: avoid proxy if networktype of target address is not tcp #8215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
751dd5e
no proxy if not tcp
eshitachandwani Apr 2, 2025
4476246
endpoint check
eshitachandwani Apr 2, 2025
39d3ded
passing tests
eshitachandwani Apr 2, 2025
fcbc507
test
eshitachandwani Apr 2, 2025
618877a
correct test
eshitachandwani Apr 2, 2025
d150d8e
Merge branch 'master' into d_error
eshitachandwani Apr 2, 2025
325e8cb
merge conflicts
eshitachandwani Apr 2, 2025
f2a6133
check
eshitachandwani Apr 7, 2025
8742ea7
var change
eshitachandwani Apr 7, 2025
607ec91
var change
eshitachandwani Apr 7, 2025
d254b04
refactor
eshitachandwani Apr 11, 2025
cfe7e4f
comments
eshitachandwani Apr 14, 2025
e8e8438
comments
eshitachandwani Apr 14, 2025
b941b53
proxy resolver
eshitachandwani Apr 21, 2025
fc393ba
remove bool
eshitachandwani Apr 21, 2025
00c62de
make channel buffered
eshitachandwani Apr 21, 2025
a98a62f
make channel buffered
eshitachandwani Apr 21, 2025
2d449cb
fix non tcp after tcp
eshitachandwani Apr 23, 2025
0f831a1
rough
eshitachandwani Apr 23, 2025
aa60db2
correct test
eshitachandwani Apr 24, 2025
854daca
correct test
eshitachandwani Apr 24, 2025
d4942ba
correct test
eshitachandwani Apr 24, 2025
821475e
correct test
eshitachandwani Apr 24, 2025
a3fa521
refactor
eshitachandwani Apr 24, 2025
9b384af
refactor
eshitachandwani Apr 24, 2025
8f157d8
blank line
eshitachandwani Apr 24, 2025
94c1f47
blank line
eshitachandwani Apr 24, 2025
7c5ad78
test
eshitachandwani Apr 25, 2025
1aa9f1c
test
eshitachandwani Apr 28, 2025
0049fa9
minor changes
eshitachandwani Apr 28, 2025
bf57c2c
assignment
eshitachandwani Apr 29, 2025
6e77ff3
minors
eshitachandwani Apr 30, 2025
fa78c83
minor
eshitachandwani May 2, 2025
1679375
goroutine order comment
eshitachandwani May 6, 2025
0a50ca5
comment
eshitachandwani May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 114 additions & 57 deletions internal/resolver/delegatingresolver/delegatingresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand All @@ -40,7 +42,7 @@

// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
// a intermediary between the gRPC ClientConn and the child resolvers.
// an intermediary between the gRPC ClientConn and the child resolvers.
//
// It implements the [resolver.Resolver] interface.
type delegatingResolver struct {
Expand All @@ -66,8 +68,8 @@

func (nopResolver) Close() {}

// proxyURLForTarget determines the proxy URL for the given address based on
// the environment. It can return the following:
// proxyURLForTarget determines the proxy URL for the given address based on the
// environment. It can return the following:
// - nil URL, nil error: No proxy is configured or the address is excluded
// using the `NO_PROXY` environment variable or if req.URL.Host is
// "localhost" (with or without // a port number)
Expand All @@ -86,7 +88,8 @@
// resolvers:
// - one to resolve the proxy address specified using the supported
// environment variables. This uses the registered resolver for the "dns"
// scheme.
// scheme. It is lazily built when a target resolver update contains at least
// one TCP address.
// - one to resolve the target URI using the resolver specified by the scheme
// in the target URI or specified by the user using the WithResolvers dial
// option. As a special case, if the target URI's scheme is "dns" and a
Expand All @@ -95,8 +98,10 @@
// resolution is enabled using the dial option.
func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
r := &delegatingResolver{
target: target,
cc: cc,
target: target,
cc: cc,
proxyResolver: nopResolver{},
targetResolver: nopResolver{},
}

var err error
Expand All @@ -123,37 +128,27 @@
// resolution should be handled by the proxy, not the client. Therefore, we
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
state := resolver.State{
r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: target.Endpoint()}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}},
}
r.targetResolverState = &state
} else {
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
}

if r.proxyResolver, err = r.proxyURIResolver(opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: failed to build resolver for proxy URL %q: %v", r.proxyURL, err)
// r.targetResolverState = &state
r.updateTargetResolverState(*r.targetResolverState)
return r, nil
}

if r.targetResolver == nil {
r.targetResolver = nopResolver{}
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.proxyResolver == nil {
r.proxyResolver = nopResolver{}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)

Check warning on line 144 in internal/resolver/delegatingresolver/delegatingresolver.go

View check run for this annotation

Codecov / codecov/patch

internal/resolver/delegatingresolver/delegatingresolver.go#L144

Added line #L144 was not covered by tests
}
return r, nil
}

// proxyURIResolver creates a resolver for resolving proxy URIs using the
// "dns" scheme. It adjusts the proxyURL to conform to the "dns:///" format and
// builds a resolver with a wrappingClientConn to capture resolved addresses.
// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
// a resolver with a wrappingClientConn to capture resolved addresses.
func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
proxyBuilder := resolver.Get("dns")
if proxyBuilder == nil {
Expand Down Expand Up @@ -189,18 +184,43 @@
r.proxyResolver = nil
}

// updateClientConnStateLocked creates a list of combined addresses by
// pairing each proxy address with every target address. For each pair, it
// generates a new [resolver.Address] using the proxy address, and adding the
// target address as the attribute along with user info. It returns nil if
// either resolver has not sent update even once and returns the error from
// ClientConn update once both resolvers have sent update atleast once.
func networkTypeFromAddr(addr resolver.Address) string {
networkType, ok := networktype.Get(addr)
if !ok {
networkType, _ = transport.ParseDialTarget(addr.Addr)
}
return networkType
}

func isTCPAddressPresent(state *resolver.State) bool {
for _, addr := range state.Addresses {
if networkType := networkTypeFromAddr(addr); networkType == "tcp" {
return true
}
}
for _, endpoint := range state.Endpoints {
for _, addr := range endpoint.Addresses {
if networktype := networkTypeFromAddr(addr); networktype == "tcp" {
return true
}
}
}
return false
}

// updateClientConnStateLocked constructs a combined list of addresses by
// pairing each proxy address with every target address of type TCP. For each
// pair, it creates a new [resolver.Address] using the proxy address and
// attaches the corresponding target address and user info as attributes. Target
// addresses that are not of type TCP are appended to the list as-is. The
// function returns nil if either resolver has not yet provided an update, and
// returns the result of ClientConn.UpdateState once both resolvers have
// provided at least one update.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
return nil
}

curState := *r.targetResolverState
// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
Expand All @@ -218,24 +238,34 @@
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addresses = append(addresses, targetAddr)
continue
}
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}

// Create a list of combined endpoints by pairing all proxy endpoints
// with every target endpoint. Each time, it constructs a new
// [resolver.Endpoint] using the all addresses from all the proxy endpoint
// and the target addresses from one endpoint. The target address and user
// information from the proxy URL are added as attributes to the proxy
// address.The resulting list of addresses is then grouped into endpoints,
// covering all combinations of proxy and target endpoints.
// Create a list of combined endpoints by pairing all proxy endpoints with
// every target endpoint. Each time, it constructs a new [resolver.Endpoint]
// using the all addresses from all the proxy endpoint and the target
// addresses from one endpoint. The target address and user information from
// the proxy URL are added as attributes to the proxy address.The resulting
// list of addresses is then grouped into endpoints, covering all
// combinations of proxy and target endpoints.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of repetition between this comment and the one at the top of this method. Can you make this one more concise?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
var addrs []resolver.Address
for _, proxyAddr := range r.proxyAddrs {
for _, targetAddr := range endpt.Addresses {
for _, targetAddr := range endpt.Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addrs = append(addrs, targetAddr)
continue
}
for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
Expand All @@ -246,8 +276,9 @@
}
// Use the targetResolverState for its service config and attributes
// contents. The state update is only sent after both the target and proxy
// resolvers have sent their updates, and curState has been updated with
// the combined addresses.
// resolvers have sent their updates, and curState has been updated with the
// combined addresses.
curState := *r.targetResolverState
curState.Addresses = addresses
curState.Endpoints = endpoints
return r.cc.UpdateState(curState)
Expand All @@ -257,16 +288,17 @@
// addresses and endpoints, marking the resolver as ready, and triggering a
// state update if both proxy and target resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the target resolver. It
// is a StateListener function of wrappingClientConn passed to the proxy resolver.
// is a StateListener function of wrappingClientConn passed to the proxy
// resolver.
func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
if logger.V(2) {
logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
}
if len(state.Endpoints) > 0 {
// We expect exactly one address per endpoint because the proxy
// resolver uses "dns" resolution.
// We expect exactly one address per endpoint because the proxy resolver
// uses "dns" resolution.
r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
for _, endpoint := range state.Endpoints {
r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
Expand Down Expand Up @@ -294,11 +326,14 @@
return err
}

// updateTargetResolverState updates the target resolver state by storing target
// addresses, endpoints, and service config, marking the resolver as ready, and
// triggering a state update if both resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It
// is a StateListener function of wrappingClientConn passed to the target resolver.
// updateTargetResolverState is the StateListener function provided to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this StateListener being mentioned in a couple of places. There is no such method on the resolver.ClientConn interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a method on the wrappingClientConn , used to call different update state functions for proxy and target resolver. Should that not be mentioned in the comments? or is the use not clear ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any method names StateListener on the wrappingClientConn. Am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/grpc/grpc-go/pull/8215/files#r2073756977 , I see the stateListener function here, or am I misunderstanding the comment ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Ok, I was searching for StateListener, while the field is stateListener. Case sensitive :)

// target resolver via wrappingClientConn. It updates the resolver state and
// marks the target resolver as ready. If the update includes at least one TCP
// address and the proxy resolver has not yet been constructed, it initializes
// the proxy resolver. A combined state update is triggered once both resolvers
// are ready. If all addresses are non-TCP, it proceeds without waiting for the
// proxy resolver. If ClientConn.UpdateState returns a non-nil error,
// ResolveNow() is called on the proxy resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -307,6 +342,27 @@
logger.Infof("Addresses received from target resolver: %v", state.Addresses)
}
r.targetResolverState = &state
// If no addresses returned by resolver have network type as tcp , do not
// wait for proxy update.
if !isTCPAddressPresent(r.targetResolverState) {
return r.cc.UpdateState(*r.targetResolverState)
}

if len(r.proxyAddrs) == 0 {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if _, ok := r.proxyResolver.(nopResolver); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can invert the check to return early, reducing one level of indentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Done!

proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
if err != nil {
r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
return
}

Check warning on line 360 in internal/resolver/delegatingresolver/delegatingresolver.go

View check run for this annotation

Codecov / codecov/patch

internal/resolver/delegatingresolver/delegatingresolver.go#L358-L360

Added lines #L358 - L360 were not covered by tests
r.proxyResolver = proxyResolver
}
}()
}

err := r.updateClientConnStateLocked()
if err != nil {
go func() {
Expand Down Expand Up @@ -335,7 +391,8 @@
return wcc.stateListener(state)
}

// ReportError intercepts errors from the child resolvers and passes them to ClientConn.
// ReportError intercepts errors from the child resolvers and passes them to
// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
Expand All @@ -346,8 +403,8 @@
wcc.UpdateState(resolver.State{Addresses: addrs})
}

// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
// ParseServiceConfig parses the provided service config and returns an object
// that provides the parsed config.
func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
}
Loading