aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony An <40644135+tonyjongyoonan@users.noreply.github.com>2023-06-22 10:46:34 -0700
committerGitHub <noreply@github.com>2023-06-22 10:46:34 -0700
commit2effae6249b1c916b01c61d254625aea49e8a599 (patch)
tree3d88d169cc4e43c3e6a8ac2f3007952345c35de7
parentf1de820c19f89427e339209c338d1f85e32995d2 (diff)
downloadgrpc-grpc-java-2effae6249b1c916b01c61d254625aea49e8a599.tar.gz
okhttp: tsan socket data race bug fix (#10279)
replaced use of bareSocket with a synchronized socket, added additional lock to synchronize initialization with shutdown() to fix a Java bug
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java39
1 files changed, 23 insertions, 16 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
index f908e6a23..dae59f37a 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
@@ -92,10 +92,10 @@ final class OkHttpServerTransport implements ServerTransport,
private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
private final Config config;
- private final Socket bareSocket;
private final Variant variant = new Http2();
private final TransportTracer tracer;
private final InternalLogId logId;
+ private Socket socket;
private ServerTransportListener listener;
private Executor transportExecutor;
private ScheduledExecutorService scheduledExecutorService;
@@ -141,11 +141,11 @@ final class OkHttpServerTransport implements ServerTransport,
public OkHttpServerTransport(Config config, Socket bareSocket) {
this.config = Preconditions.checkNotNull(config, "config");
- this.bareSocket = Preconditions.checkNotNull(bareSocket, "bareSocket");
+ this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
tracer = config.transportTracerFactory.create();
tracer.setFlowControlWindowReader(this::readFlowControlWindow);
- logId = InternalLogId.allocate(getClass(), bareSocket.getRemoteSocketAddress().toString());
+ logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
transportExecutor = config.transportExecutorPool.getObject();
scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
@@ -161,10 +161,17 @@ final class OkHttpServerTransport implements ServerTransport,
private void startIo(SerializingExecutor serializingExecutor) {
try {
- bareSocket.setTcpNoDelay(true);
+ // The socket implementation is lazily initialized, but had broken thread-safety
+ // for that laziness https://bugs.openjdk.org/browse/JDK-8278326.
+ // As a workaround, we lock to synchronize initialization with shutdown().
+ synchronized (lock) {
+ socket.setTcpNoDelay(true);
+ }
HandshakerSocketFactory.HandshakeResult result =
- config.handshakerSocketFactory.handshake(bareSocket, Attributes.EMPTY);
- Socket socket = result.socket;
+ config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
+ synchronized (lock) {
+ this.socket = result.socket;
+ }
this.attributes = result.attributes;
int maxQueuedControlFrames = 10000;
@@ -249,7 +256,7 @@ final class OkHttpServerTransport implements ServerTransport,
log.log(Level.INFO, "Socket failed to handshake", ex);
}
}
- GrpcUtil.closeQuietly(bareSocket);
+ GrpcUtil.closeQuietly(socket);
terminated();
}
}
@@ -268,7 +275,7 @@ final class OkHttpServerTransport implements ServerTransport,
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
if (frameWriter == null) {
handshakeShutdown = true;
- GrpcUtil.closeQuietly(bareSocket);
+ GrpcUtil.closeQuietly(socket);
} else {
// RFC7540 ยง6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
@@ -309,7 +316,7 @@ final class OkHttpServerTransport implements ServerTransport,
synchronized (lock) {
if (frameWriter == null) {
handshakeShutdown = true;
- GrpcUtil.closeQuietly(bareSocket);
+ GrpcUtil.closeQuietly(socket);
return;
}
}
@@ -360,7 +367,7 @@ final class OkHttpServerTransport implements ServerTransport,
private void triggerForcefulClose() {
// Safe to do unconditionally; no need to check if timer cancellation raced
- GrpcUtil.closeQuietly(bareSocket);
+ GrpcUtil.closeQuietly(socket);
}
private void terminated() {
@@ -396,9 +403,9 @@ final class OkHttpServerTransport implements ServerTransport,
synchronized (lock) {
return Futures.immediateFuture(new InternalChannelz.SocketStats(
tracer.getStats(),
- bareSocket.getLocalSocketAddress(),
- bareSocket.getRemoteSocketAddress(),
- Utils.getSocketOptions(bareSocket),
+ socket.getLocalSocketAddress(),
+ socket.getRemoteSocketAddress(),
+ Utils.getSocketOptions(socket),
securityInfo));
}
}
@@ -593,12 +600,12 @@ final class OkHttpServerTransport implements ServerTransport,
} finally {
// Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
try {
- GrpcUtil.exhaust(bareSocket.getInputStream());
+ GrpcUtil.exhaust(socket.getInputStream());
} catch (IOException ex) {
// Unable to wait, so just proceed to tear-down. The socket is probably already closed so
// the GOAWAY can't be sent anyway.
}
- GrpcUtil.closeQuietly(bareSocket);
+ GrpcUtil.closeQuietly(socket);
terminated();
Thread.currentThread().setName(threadName);
}
@@ -1108,7 +1115,7 @@ final class OkHttpServerTransport implements ServerTransport,
synchronized (lock) {
goAwayStatus = Status.UNAVAILABLE
.withDescription("Keepalive failed. Considering connection dead");
- GrpcUtil.closeQuietly(bareSocket);
+ GrpcUtil.closeQuietly(socket);
}
}
}