diff options
author | Tony An <40644135+tonyjongyoonan@users.noreply.github.com> | 2023-06-22 10:46:34 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-22 10:46:34 -0700 |
commit | 2effae6249b1c916b01c61d254625aea49e8a599 (patch) | |
tree | 3d88d169cc4e43c3e6a8ac2f3007952345c35de7 | |
parent | f1de820c19f89427e339209c338d1f85e32995d2 (diff) | |
download | grpc-grpc-java-2effae6249b1c916b01c61d254625aea49e8a599.tar.gz |
okhttp: tsan socket data race bug fix (#10279)
replaced use of bareSocket with a synchronized socket, added additional lock to synchronize initialization with shutdown() to fix a Java bug
-rw-r--r-- | okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index f908e6a23..dae59f37a 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -92,10 +92,10 @@ final class OkHttpServerTransport implements ServerTransport, private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length"); private final Config config; - private final Socket bareSocket; private final Variant variant = new Http2(); private final TransportTracer tracer; private final InternalLogId logId; + private Socket socket; private ServerTransportListener listener; private Executor transportExecutor; private ScheduledExecutorService scheduledExecutorService; @@ -141,11 +141,11 @@ final class OkHttpServerTransport implements ServerTransport, public OkHttpServerTransport(Config config, Socket bareSocket) { this.config = Preconditions.checkNotNull(config, "config"); - this.bareSocket = Preconditions.checkNotNull(bareSocket, "bareSocket"); + this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket"); tracer = config.transportTracerFactory.create(); tracer.setFlowControlWindowReader(this::readFlowControlWindow); - logId = InternalLogId.allocate(getClass(), bareSocket.getRemoteSocketAddress().toString()); + logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString()); transportExecutor = config.transportExecutorPool.getObject(); scheduledExecutorService = config.scheduledExecutorServicePool.getObject(); keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls, @@ -161,10 +161,17 @@ final class OkHttpServerTransport implements ServerTransport, private void startIo(SerializingExecutor serializingExecutor) { try { - bareSocket.setTcpNoDelay(true); + // The socket implementation is lazily initialized, but had broken thread-safety + // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. + // As a workaround, we lock to synchronize initialization with shutdown(). + synchronized (lock) { + socket.setTcpNoDelay(true); + } HandshakerSocketFactory.HandshakeResult result = - config.handshakerSocketFactory.handshake(bareSocket, Attributes.EMPTY); - Socket socket = result.socket; + config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY); + synchronized (lock) { + this.socket = result.socket; + } this.attributes = result.attributes; int maxQueuedControlFrames = 10000; @@ -249,7 +256,7 @@ final class OkHttpServerTransport implements ServerTransport, log.log(Level.INFO, "Socket failed to handshake", ex); } } - GrpcUtil.closeQuietly(bareSocket); + GrpcUtil.closeQuietly(socket); terminated(); } } @@ -268,7 +275,7 @@ final class OkHttpServerTransport implements ServerTransport, this.gracefulShutdownPeriod = gracefulShutdownPeriod; if (frameWriter == null) { handshakeShutdown = true; - GrpcUtil.closeQuietly(bareSocket); + GrpcUtil.closeQuietly(socket); } else { // RFC7540 ยง6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but // we also set a timer to limit the upper bound in case the PING is excessively stalled or @@ -309,7 +316,7 @@ final class OkHttpServerTransport implements ServerTransport, synchronized (lock) { if (frameWriter == null) { handshakeShutdown = true; - GrpcUtil.closeQuietly(bareSocket); + GrpcUtil.closeQuietly(socket); return; } } @@ -360,7 +367,7 @@ final class OkHttpServerTransport implements ServerTransport, private void triggerForcefulClose() { // Safe to do unconditionally; no need to check if timer cancellation raced - GrpcUtil.closeQuietly(bareSocket); + GrpcUtil.closeQuietly(socket); } private void terminated() { @@ -396,9 +403,9 @@ final class OkHttpServerTransport implements ServerTransport, synchronized (lock) { return Futures.immediateFuture(new InternalChannelz.SocketStats( tracer.getStats(), - bareSocket.getLocalSocketAddress(), - bareSocket.getRemoteSocketAddress(), - Utils.getSocketOptions(bareSocket), + socket.getLocalSocketAddress(), + socket.getRemoteSocketAddress(), + Utils.getSocketOptions(socket), securityInfo)); } } @@ -593,12 +600,12 @@ final class OkHttpServerTransport implements ServerTransport, } finally { // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket try { - GrpcUtil.exhaust(bareSocket.getInputStream()); + GrpcUtil.exhaust(socket.getInputStream()); } catch (IOException ex) { // Unable to wait, so just proceed to tear-down. The socket is probably already closed so // the GOAWAY can't be sent anyway. } - GrpcUtil.closeQuietly(bareSocket); + GrpcUtil.closeQuietly(socket); terminated(); Thread.currentThread().setName(threadName); } @@ -1108,7 +1115,7 @@ final class OkHttpServerTransport implements ServerTransport, synchronized (lock) { goAwayStatus = Status.UNAVAILABLE .withDescription("Keepalive failed. Considering connection dead"); - GrpcUtil.closeQuietly(bareSocket); + GrpcUtil.closeQuietly(socket); } } } |