Skip to content

Add Client Metadata Update Support. #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,7 @@ testlogger {
showFailedStandardStreams = true
logLevel = LogLevel.LIFECYCLE
}

dependencies {
testImplementation(libs.assertj)
}
Comment on lines +109 to +111
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssertJ has been used in this PR and added to test-base as it is a useful library that could be shared across all modules.

Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ private ServerTuple(final ClusterableServer server, final ServerDescription desc
}
}

AbstractMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory) {
super(clusterId, settings, serverFactory);
AbstractMultiServerCluster(final ClusterId clusterId,
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
super(clusterId, settings, serverFactory, clientMetadata);
isTrue("connection mode is multiple", settings.getMode() == MULTIPLE);
clusterType = settings.getRequiredClusterType();
replicaSetName = settings.getRequiredReplicaSetName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrue;
Expand Down Expand Up @@ -101,26 +101,36 @@ abstract class BaseCluster implements Cluster {
private final ClusterListener clusterListener;
private final Deque<ServerSelectionRequest> waitQueue = new ConcurrentLinkedDeque<>();
private final ClusterClock clusterClock = new ClusterClock();
private final ClientMetadata clientMetadata;
private Thread waitQueueHandler;

private volatile boolean isClosed;
private volatile ClusterDescription description;

BaseCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory) {
BaseCluster(final ClusterId clusterId,
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
this.clusterId = notNull("clusterId", clusterId);
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
this.clusterListener = singleClusterListener(settings);
clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
this.clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
this.description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
settings, serverFactory.getSettings());
this.clientMetadata = clientMetadata;
}

@Override
public ClusterClock getClock() {
return clusterClock;
}

@Override
public ClientMetadata getClientMetadata() {
return clientMetadata;
}

@Override
public ServerTuple selectServer(final ServerSelector serverSelector, final OperationContext operationContext) {
isTrue("open", !isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,31 @@ static boolean clientMetadataDocumentTooLarge(final BsonDocument document) {
return buffer.getPosition() > MAXIMUM_CLIENT_METADATA_ENCODED_SIZE;
}

public static BsonDocument updateClientMedataDocument(final BsonDocument clientMetadataDocument,
final MongoDriverInformation mongoDriverInformation) {
BsonDocument updatedClientMetadataDocument = clientMetadataDocument.clone();
BsonDocument driverInformation = clientMetadataDocument.getDocument("driver");

MongoDriverInformation.Builder builder = MongoDriverInformation.builder(mongoDriverInformation)
.driverName(driverInformation.getString("name").getValue())
.driverVersion(driverInformation.getString("version").getValue());

if (updatedClientMetadataDocument.containsKey("platform")) {
builder.driverPlatform(updatedClientMetadataDocument.getString("platform").getValue());
}

MongoDriverInformation updatedDriverInformation = builder.build();

tryWithLimit(updatedClientMetadataDocument, d -> {
putAtPath(d, "driver.name", listToString(updatedDriverInformation.getDriverNames()));
putAtPath(d, "driver.version", listToString(updatedDriverInformation.getDriverVersions()));
});
tryWithLimit(updatedClientMetadataDocument, d -> {
putAtPath(d, "platform", listToString(updatedDriverInformation.getDriverPlatforms()));
});
return updatedClientMetadataDocument;
}

public enum ContainerRuntime {
DOCKER("docker") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public interface Cluster extends Closeable {
*/
ClusterClock getClock();

ClientMetadata getClientMetadata();

ServerTuple selectServer(ServerSelector serverSelector, OperationContext operationContext);

void selectServerAsync(ServerSelector serverSelector, OperationContext operationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,29 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
InternalOperationContextFactory heartBeatOperationContextFactory =
new InternalOperationContextFactory(heartbeatTimeoutSettings, serverApi);

ClientMetadata clientMetadata = new ClientMetadata(
applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build());

if (clusterSettings.getMode() == ClusterConnectionMode.LOAD_BALANCED) {
ClusterableServerFactory serverFactory = new LoadBalancedClusterableServerFactory(serverSettings,
connectionPoolSettings, internalConnectionPoolSettings, streamFactory, credential, loggerSettings, commandListener,
applicationName, mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(),
compressorList, serverApi, clusterOperationContextFactory);
return new LoadBalancedCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory);
return new LoadBalancedCluster(clusterId, clusterSettings, serverFactory, clientMetadata, dnsSrvRecordMonitorFactory);
} else {
ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(serverSettings,
connectionPoolSettings, internalConnectionPoolSettings,
clusterOperationContextFactory, streamFactory, heartBeatOperationContextFactory, heartbeatStreamFactory, credential,
loggerSettings, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
loggerSettings, commandListener, compressorList,
serverApi, FaasEnvironment.getFaasEnvironment() != FaasEnvironment.UNKNOWN);

if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);
return new SingleServerCluster(clusterId, clusterSettings, serverFactory, clientMetadata);
} else if (clusterSettings.getMode() == ClusterConnectionMode.MULTIPLE) {
if (clusterSettings.getSrvHost() == null) {
return new MultiServerCluster(clusterId, clusterSettings, serverFactory);
return new MultiServerCluster(clusterId, clusterSettings, serverFactory, clientMetadata);
} else {
return new DnsMultiServerCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory);
return new DnsMultiServerCluster(clusterId, clusterSettings, serverFactory, clientMetadata, dnsSrvRecordMonitorFactory);
}
} else {
throw new UnsupportedOperationException("Unsupported cluster mode: " + clusterSettings.getMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.mongodb.LoggerSettings;
import com.mongodb.MongoCompressor;
import com.mongodb.MongoCredential;
import com.mongodb.MongoDriverInformation;
import com.mongodb.ServerAddress;
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterConnectionMode;
Expand Down Expand Up @@ -50,8 +49,6 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory
private final MongoCredentialWithCache credential;
private final LoggerSettings loggerSettings;
private final CommandListener commandListener;
private final String applicationName;
private final MongoDriverInformation mongoDriverInformation;
private final List<MongoCompressor> compressorList;
@Nullable
private final ServerApi serverApi;
Expand All @@ -63,8 +60,7 @@ public DefaultClusterableServerFactory(
final InternalOperationContextFactory clusterOperationContextFactory, final StreamFactory streamFactory,
final InternalOperationContextFactory heartbeatOperationContextFactory, final StreamFactory heartbeatStreamFactory,
@Nullable final MongoCredential credential, final LoggerSettings loggerSettings,
@Nullable final CommandListener commandListener, @Nullable final String applicationName,
@Nullable final MongoDriverInformation mongoDriverInformation,
@Nullable final CommandListener commandListener,
final List<MongoCompressor> compressorList, @Nullable final ServerApi serverApi, final boolean isFunctionAsAServiceEnvironment) {
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
Expand All @@ -76,8 +72,6 @@ public DefaultClusterableServerFactory(
this.credential = credential == null ? null : new MongoCredentialWithCache(credential);
this.loggerSettings = loggerSettings;
this.commandListener = commandListener;
this.applicationName = applicationName;
this.mongoDriverInformation = mongoDriverInformation;
this.compressorList = compressorList;
this.serverApi = serverApi;
this.isFunctionAsAServiceEnvironment = isFunctionAsAServiceEnvironment;
Expand All @@ -88,15 +82,17 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve
ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress);
ClusterConnectionMode clusterMode = cluster.getSettings().getMode();
SameObjectProvider<SdamServerDescriptionManager> sdamProvider = SameObjectProvider.uninitialized();
ClientMetadata clientMetadata = cluster.getClientMetadata();

ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings,
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), loggerSettings, null, serverApi),
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, clientMetadata,
emptyList(), loggerSettings, null, serverApi),
clusterMode, serverApi, isFunctionAsAServiceEnvironment, sdamProvider, heartbeatOperationContextFactory);

ConnectionPool connectionPool = new DefaultConnectionPool(serverId,
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi),
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, clientMetadata,
compressorList, loggerSettings, commandListener, serverApi),
connectionPoolSettings, internalConnectionPoolSettings, sdamProvider, clusterOperationContextFactory);
ServerListener serverListener = singleServerListener(serverSettings);
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverListener, serverMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public final class DnsMultiServerCluster extends AbstractMultiServerCluster {
private final DnsSrvRecordMonitor dnsSrvRecordMonitor;
private volatile MongoException srvResolutionException;

public DnsMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory,
public DnsMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata,
final DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory) {
super(clusterId, settings, serverFactory);
super(clusterId, settings, serverFactory, clientMetadata);
dnsSrvRecordMonitor = dnsSrvRecordMonitorFactory.create(assertNotNull(settings.getSrvHost()), settings.getSrvServiceName(),
new DnsSrvRecordInitializer() {
private volatile boolean initialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@
import com.mongodb.AuthenticationMechanism;
import com.mongodb.LoggerSettings;
import com.mongodb.MongoCompressor;
import com.mongodb.MongoDriverInformation;
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ServerId;
import com.mongodb.event.CommandListener;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;

import java.util.List;

import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;

class InternalStreamConnectionFactory implements InternalConnectionFactory {
private final ClusterConnectionMode clusterConnectionMode;
private final boolean isMonitoringConnection;
private final StreamFactory streamFactory;
private final BsonDocument clientMetadataDocument;
private final ClientMetadata clientMetadata;
private final List<MongoCompressor> compressorList;
private final LoggerSettings loggerSettings;
private final CommandListener commandListener;
Expand All @@ -47,17 +44,17 @@ class InternalStreamConnectionFactory implements InternalConnectionFactory {
InternalStreamConnectionFactory(final ClusterConnectionMode clusterConnectionMode,
final StreamFactory streamFactory,
@Nullable final MongoCredentialWithCache credential,
@Nullable final String applicationName, @Nullable final MongoDriverInformation mongoDriverInformation,
final ClientMetadata clientMetadata,
final List<MongoCompressor> compressorList,
final LoggerSettings loggerSettings, @Nullable final CommandListener commandListener, @Nullable final ServerApi serverApi) {
this(clusterConnectionMode, false, streamFactory, credential, applicationName, mongoDriverInformation, compressorList,
this(clusterConnectionMode, false, streamFactory, credential, clientMetadata, compressorList,
loggerSettings, commandListener, serverApi);
}

InternalStreamConnectionFactory(final ClusterConnectionMode clusterConnectionMode, final boolean isMonitoringConnection,
final StreamFactory streamFactory,
@Nullable final MongoCredentialWithCache credential,
@Nullable final String applicationName, @Nullable final MongoDriverInformation mongoDriverInformation,
final ClientMetadata clientMetadata,
final List<MongoCompressor> compressorList,
final LoggerSettings loggerSettings, @Nullable final CommandListener commandListener, @Nullable final ServerApi serverApi) {
this.clusterConnectionMode = clusterConnectionMode;
Expand All @@ -67,15 +64,15 @@ class InternalStreamConnectionFactory implements InternalConnectionFactory {
this.loggerSettings = loggerSettings;
this.commandListener = commandListener;
this.serverApi = serverApi;
this.clientMetadataDocument = createClientMetadataDocument(applicationName, mongoDriverInformation);
this.clientMetadata = clientMetadata;
this.credential = credential;
}

@Override
public InternalConnection create(final ServerId serverId, final ConnectionGenerationSupplier connectionGenerationSupplier) {
Authenticator authenticator = credential == null ? null : createAuthenticator(credential);
InternalStreamConnectionInitializer connectionInitializer = new InternalStreamConnectionInitializer(
clusterConnectionMode, authenticator, clientMetadataDocument, compressorList, serverApi);
clusterConnectionMode, authenticator, clientMetadata.getClientMetadataBsonDocument(), compressorList, serverApi);
return new InternalStreamConnection(
clusterConnectionMode, authenticator,
isMonitoringConnection, serverId, connectionGenerationSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ final class LoadBalancedCluster implements Cluster {
private final ClusterId clusterId;
private final ClusterSettings settings;
private final ClusterClock clusterClock = new ClusterClock();
private final ClientMetadata clientMetadata;
private final ClusterListener clusterListener;
private ClusterDescription description;
@Nullable
Expand All @@ -91,6 +92,7 @@ final class LoadBalancedCluster implements Cluster {
private final Condition condition = lock.newCondition();

LoadBalancedCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata,
final DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory) {
assertTrue(settings.getMode() == ClusterConnectionMode.LOAD_BALANCED);
LOGGER.info(format("Cluster created with id %s and settings %s", clusterId, settings.getShortDescription()));
Expand All @@ -100,6 +102,7 @@ final class LoadBalancedCluster implements Cluster {
this.clusterListener = singleClusterListener(settings);
this.description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, emptyList(), settings,
serverFactory.getSettings());
this.clientMetadata = clientMetadata;

if (settings.getSrvHost() == null) {
dnsSrvRecordMonitor = null;
Expand Down Expand Up @@ -204,6 +207,11 @@ public ClusterClock getClock() {
return clusterClock;
}

@Override
public ClientMetadata getClientMetadata() {
return clientMetadata;
}

@Override
public ServerTuple selectServer(final ServerSelector serverSelector, final OperationContext operationContext) {
isTrue("open", !isClosed());
Expand Down
Loading