Skip to content

Commit

Permalink
fix spotless and license
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Sep 7, 2024
1 parent 129744a commit 8147bb0
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
static @NonNull CompletableFuture<AsyncOxiaClient> newInstance(@NonNull ClientConfig config) {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("oxia-client"));
final var oxiaBackoffProvider = OxiaBackoffProvider
.create(config.connectionBackoffMinDelay(), config.connectionBackoffMaxDelay());
final var oxiaBackoffProvider =
OxiaBackoffProvider.create(
config.connectionBackoffMinDelay(), config.connectionBackoffMaxDelay());
var stubManager =
new OxiaStubManager(config.namespace(), config.authentication(), config.enableTls(), oxiaBackoffProvider);
new OxiaStubManager(
config.namespace(), config.authentication(), config.enableTls(), oxiaBackoffProvider);

var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace());
var serviceAddrStub = stubManager.getStub(config.serviceAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.streamnative.oxia.client.api.Authentication;
import java.time.Duration;
import javax.annotation.Nullable;

import lombok.NonNull;

public record ClientConfig(
Expand All @@ -35,5 +34,4 @@ public record ClientConfig(
@Nullable Authentication authentication,
boolean enableTls,
@NonNull Duration connectionBackoffMinDelay,
@NonNull Duration connectionBackoffMaxDelay
) {}
@NonNull Duration connectionBackoffMaxDelay) {}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class OxiaClientBuilderImpl implements OxiaClientBuilder {
@NonNull protected Duration connectionBackoffMinDelay = Duration.ofMillis(100);
@NonNull protected Duration connectionBackoffMaxDelay = Duration.ofSeconds(30);


@Override
public @NonNull OxiaClientBuilder requestTimeout(@NonNull Duration requestTimeout) {
if (requestTimeout.isNegative() || requestTimeout.equals(ZERO)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.grpc;

import io.grpc.internal.BackoffPolicy;
import io.streamnative.oxia.client.util.Backoff;

import javax.annotation.concurrent.ThreadSafe;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public final class OxiaBackoffProvider implements BackoffPolicy.Provider {
public static final BackoffPolicy.Provider DEFAULT = new OxiaBackoffProvider(Backoff.DEFAULT_INITIAL_DELAY_MILLIS,
TimeUnit.MILLISECONDS, Backoff.DEFAULT_MAX_DELAY_SECONDS,
TimeUnit.MILLISECONDS);
public static final BackoffPolicy.Provider DEFAULT =
new OxiaBackoffProvider(
Backoff.DEFAULT_INITIAL_DELAY_MILLIS,
TimeUnit.MILLISECONDS,
Backoff.DEFAULT_MAX_DELAY_SECONDS,
TimeUnit.MILLISECONDS);
private final long initialDelay;
private final TimeUnit unitInitialDelay;
private final long maxDelay;
private final TimeUnit unitMaxDelay;

OxiaBackoffProvider(long initialDelay,
TimeUnit unitInitialDelay,
long maxDelay,
TimeUnit unitMaxDelay) {
OxiaBackoffProvider(
long initialDelay, TimeUnit unitInitialDelay, long maxDelay, TimeUnit unitMaxDelay) {
this.initialDelay = initialDelay;
this.unitInitialDelay = unitInitialDelay;
this.maxDelay = maxDelay;
Expand All @@ -33,7 +48,7 @@ public BackoffPolicy get() {
}

public static BackoffPolicy.Provider create(Duration minDelay, Duration maxDelay) {
return new OxiaBackoffProvider(minDelay.getNano(),
TimeUnit.NANOSECONDS, maxDelay.getNano(), TimeUnit.NANOSECONDS);
return new OxiaBackoffProvider(
minDelay.getNano(), TimeUnit.NANOSECONDS, maxDelay.getNano(), TimeUnit.NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public class OxiaStubManager implements AutoCloseable {
@Nullable private final BackoffPolicy.Provider backoffProvider;

public OxiaStubManager(
String namespace, @Nullable Authentication authentication, boolean enableTls,
String namespace,
@Nullable Authentication authentication,
boolean enableTls,
@Nullable BackoffPolicy.Provider backoffProvider) {
this.namespace = namespace;
this.authentication = authentication;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.streamnative.oxia.client.api.OptionBackoff;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.util.Backoff;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -85,8 +84,7 @@ public AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) {
optionBackoff.initDelay(),
optionBackoff.initDelayUnit(),
optionBackoff.maxDelay(),
optionBackoff.maxDelayUnit()
),
optionBackoff.maxDelayUnit()),
optionAutoRevalidate));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.streamnative.oxia.client.util;

import io.grpc.internal.BackoffPolicy;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,15 +32,11 @@ public Backoff() {
DEFAULT_INITIAL_DELAY_MILLIS,
TimeUnit.MILLISECONDS,
DEFAULT_MAX_DELAY_SECONDS,
TimeUnit.SECONDS
);
TimeUnit.SECONDS);
}

public Backoff(
long initialDelay,
TimeUnit unitInitialDelay,
long maxDelay,
TimeUnit unitMaxDelay) {
long initialDelay, TimeUnit unitInitialDelay, long maxDelay, TimeUnit unitMaxDelay) {
this.initialDelayMillis = unitInitialDelay.toMillis(initialDelay);
this.maxDelayMillis = unitMaxDelay.toMillis(maxDelay);
this.nextDelayMillis = initialDelayMillis;
Expand All @@ -62,7 +57,6 @@ public void reset() {
this.nextDelayMillis = initialDelayMillis;
}


@Override
public long nextBackoffNanos() {
return TimeUnit.MILLISECONDS.toNanos(nextDelayMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public void setUp() throws Exception {
new OxiaStub(
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
"default",
authentication, OxiaBackoffProvider.DEFAULT);
authentication,
OxiaBackoffProvider.DEFAULT);
clientByShardId = mock(OxiaStubProvider.class);
lenient().when(clientByShardId.getStubForShard(anyLong())).thenReturn(stub);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.grpc;

import io.grpc.stub.StreamObserver;
import io.streamnative.oxia.proto.GetRequest;
import io.streamnative.oxia.proto.ReadRequest;
import io.streamnative.oxia.proto.ReadResponse;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import lombok.Cleanup;
import lombok.SneakyThrows;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.implementation.bytecode.Throw;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.shadow.com.univocity.parsers.annotations.EnumOptions;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

@Testcontainers
@Slf4j
Expand All @@ -40,11 +47,9 @@ public enum BackoffType {
public void testOxiaReconnectBackoff(BackoffType type) throws Exception {
final OxiaStubManager stubManager;
if (type == BackoffType.Oxia) {
stubManager = new OxiaStubManager("default",
null, false, OxiaBackoffProvider.DEFAULT);
stubManager = new OxiaStubManager("default", null, false, OxiaBackoffProvider.DEFAULT);
} else {
stubManager = new OxiaStubManager("default",
null, false, null);
stubManager = new OxiaStubManager("default", null, false, null);
}

final OxiaStub stub = stubManager.getStub(oxia.getServiceAddress());
Expand All @@ -69,10 +74,9 @@ public void testOxiaReconnectBackoff(BackoffType type) throws Exception {

oxia.start();


startTime = System.currentTimeMillis();
elapse = 10 * 1000;
boolean success = false;
boolean success = false;
while (System.currentTimeMillis() - startTime <= elapse) {
try {
sendMessage(stub).join();
Expand All @@ -90,29 +94,31 @@ public void testOxiaReconnectBackoff(BackoffType type) throws Exception {
}

private static CompletableFuture<Void> sendMessage(OxiaStub stub) {
final var readRequest = ReadRequest.newBuilder()
.setShardId(0)
.addGets(GetRequest.newBuilder()
.setKey("test")
.build())
.build();
final var readRequest =
ReadRequest.newBuilder()
.setShardId(0)
.addGets(GetRequest.newBuilder().setKey("test").build())
.build();
final CompletableFuture<Void> f = new CompletableFuture<>();
stub.async().read(readRequest, new StreamObserver<>() {
@Override
public void onNext(ReadResponse value) {
f.complete(null);
}
stub.async()
.read(
readRequest,
new StreamObserver<>() {
@Override
public void onNext(ReadResponse value) {
f.complete(null);
}

@Override
public void onError(Throwable t) {
f.completeExceptionally(t);
}
@Override
public void onError(Throwable t) {
f.completeExceptionally(t);
}

@Override
public void onCompleted() {
f.complete(null);
}
});
@Override
public void onCompleted() {
f.complete(null);
}
});
return f;
}
}

0 comments on commit 8147bb0

Please sign in to comment.