Content-Length: 9927 | pFad | http://github.com/googleapis/google-cloud-java/pull/1001.patch
thub.com
From 0093ac3280521aa043c3af4f4c2406d3fce58356 Mon Sep 17 00:00:00 2001
From: Marco Ziccardi
Date: Wed, 11 May 2016 19:38:04 +0200
Subject: [PATCH] Make PubSub and PubSubRpc extends AutoCloseable, fix grpc
settings
---
.../java/com/google/cloud/pubsub/PubSub.java | 2 +-
.../com/google/cloud/pubsub/PubSubImpl.java | 5 ++
.../cloud/pubsub/spi/DefaultPubSubRpc.java | 83 ++++++++++++++-----
.../google/cloud/pubsub/spi/PubSubRpc.java | 5 +-
4 files changed, 74 insertions(+), 21 deletions(-)
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
index 6fde6f4425df..48de7002d54f 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
@@ -31,7 +31,7 @@
*
* @see Google Cloud Pub/Sub
*/
-public interface PubSub extends Service {
+public interface PubSub extends AutoCloseable, Service {
/**
* Class for specifying options for listing topics and subscriptions.
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index 19b2e5a35fec..bd69103b9819 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -278,4 +278,9 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti
Iterable ackIds) {
return null;
}
+
+ @Override
+ public void close() throws Exception {
+ rpc.close();
+ }
}
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
index d4f00fd7cf37..6ce27e3de56d 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
@@ -19,6 +19,8 @@
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiException;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.AuthCredentials;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSubException;
import com.google.cloud.pubsub.PubSubOptions;
@@ -27,8 +29,10 @@
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
@@ -50,48 +54,78 @@
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
+import io.grpc.ManagedChannel;
+import io.grpc.Status.Code;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
import org.joda.time.Duration;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
-
-import autovalue.shaded.com.google.common.common.collect.Sets;
-import io.grpc.Status.Code;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
public class DefaultPubSubRpc implements PubSubRpc {
private final PublisherApi publisherApi;
private final SubscriberApi subscriberApi;
+ private final ScheduledExecutorService executor =
+ MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(8));
public DefaultPubSubRpc(PubSubOptions options) throws IOException {
try {
- // Provide (and use a common thread-pool).
- // This depends on https://github.com/googleapis/gax-java/issues/73
- PublisherSettings.Builder pbuilder =
- PublisherSettings.defaultBuilder()
- .provideChannelWith(options.authCredentials().credentials())
- .applyToAllApiMethods(apiCallSettings(options));
- publisherApi = PublisherApi.create(pbuilder.build());
- SubscriberSettings.Builder sBuilder =
- SubscriberSettings.defaultBuilder()
- .provideChannelWith(options.authCredentials().credentials())
- .applyToAllApiMethods(apiCallSettings(options));
- subscriberApi = SubscriberApi.create(sBuilder.build());
+ PublisherSettings.Builder pubBuilder =
+ PublisherSettings.defaultBuilder().provideExecutorWith(executor, false);
+ SubscriberSettings.Builder subBuilder =
+ SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false);
+ // todo(mziccard): PublisherSettings should support null/absent credentials for testing
+ if (options.host().contains("localhost")
+ || options.authCredentials().equals(AuthCredentials.noAuth())) {
+ ManagedChannel channel = NettyChannelBuilder.forTarget(options.host())
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .build();
+ pubBuilder.provideChannelWith(channel, true);
+ subBuilder.provideChannelWith(channel, true);
+ } else {
+ GoogleCredentials credentials = options.authCredentials().credentials();
+ pubBuilder.provideChannelWith(
+ credentials.createScoped(PublisherSettings.DEFAULT_SERVICE_SCOPES));
+ subBuilder.provideChannelWith(
+ credentials.createScoped(SubscriberSettings.DEFAULT_SERVICE_SCOPES));
+ }
+ pubBuilder.applyToAllApiMethods(apiCallSettings(options));
+ subBuilder.applyToAllApiMethods(apiCallSettings(options));
+ publisherApi = PublisherApi.create(pubBuilder.build());
+ subscriberApi = SubscriberApi.create(subBuilder.build());
} catch (Exception ex) {
throw new IOException(ex);
}
}
+ private static long translateTimeout(long timeout) {
+ if (timeout < 0) {
+ return 20000;
+ } else if (timeout == 0) {
+ return Long.MAX_VALUE;
+ }
+ return timeout;
+ }
+
private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
// TODO: specify timeout these settings:
// retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
RetryParams retryParams = options.retryParams();
+ long connectTimeout = translateTimeout(options.connectTimeout());
+ long readTimeout = translateTimeout(options.readTimeout());
+ long maxTimeout = connectTimeout == Long.MAX_VALUE || readTimeout == Long.MAX_VALUE
+ ? Long.MAX_VALUE : connectTimeout + readTimeout;
final RetrySettings.Builder builder = RetrySettings.newBuilder()
.setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis()))
- .setInitialRpcTimeout(Duration.millis(options.connectTimeout()))
+ .setInitialRpcTimeout(Duration.millis(connectTimeout))
.setRpcTimeoutMultiplier(1.5)
- .setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout()))
+ .setMaxRpcTimeout(Duration.millis(maxTimeout))
.setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis()))
.setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor())
.setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis()));
@@ -117,7 +151,7 @@ public V apply(ApiException exception) {
@Override
public Future create(Topic topic) {
- // TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings
+ // TODO: it would be nice if we can get the idempotent information from the ApiCallSettings
// or from the exception
return translate(publisherApi.createTopicCallable().futureCall(topic), true);
}
@@ -149,7 +183,6 @@ public Future list(ListTopicSubscriptionsRequest
@Override
public Future delete(DeleteTopicRequest request) {
- // TODO: check if null is not going to work for Empty
return translate(publisherApi.deleteTopicCallable().futureCall(request), true,
Code.NOT_FOUND.value());
}
@@ -195,4 +228,16 @@ public Future pull(PullRequest request) {
public Future modify(ModifyPushConfigRequest request) {
return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false);
}
+
+ @Override
+ public ScheduledExecutorService executor() {
+ return executor;
+ }
+
+ @Override
+ public void close() throws Exception {
+ subscriberApi.close();
+ publisherApi.close();
+ executor.shutdown();
+ }
}
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java
index 8474ba042234..5af166a9f1ee 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java
@@ -38,8 +38,9 @@
import com.google.pubsub.v1.Topic;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
-public interface PubSubRpc {
+public interface PubSubRpc extends AutoCloseable {
// in all cases root cause of ExecutionException is PubSubException
Future create(Topic topic);
@@ -69,4 +70,6 @@ public interface PubSubRpc {
Future pull(PullRequest request);
Future modify(ModifyPushConfigRequest request);
+
+ ScheduledExecutorService executor();
}
--- a PPN by Garber Painting Akron. With Image Size Reduction included!Fetched URL: http://github.com/googleapis/google-cloud-java/pull/1001.patch
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy