-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add pipe receiver memory protection #18090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import org.apache.iotdb.commons.audit.UserEntity; | ||
| import org.apache.iotdb.commons.conf.CommonDescriptor; | ||
| import org.apache.iotdb.commons.exception.IllegalPathException; | ||
| import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; | ||
| import org.apache.iotdb.commons.i18n.PipeMessages; | ||
| import org.apache.iotdb.commons.pipe.config.PipeConfig; | ||
| import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; | ||
|
|
@@ -386,7 +387,7 @@ protected final TPipeTransferResp handleTransferFilePiece( | |
| final PipeTransferFilePieceReq req, | ||
| final boolean isRequestThroughAirGap, | ||
| final boolean isSingleFile) { | ||
| try { | ||
| try (final AutoCloseable ignored = tryAllocateMemoryForFilePiece(req)) { | ||
| updateWritingFileIfNeeded(req.getFileName(), isSingleFile); | ||
|
|
||
| // If the request is through air gap, the sender will resend the file piece from the beginning | ||
|
|
@@ -419,6 +420,18 @@ protected final TPipeTransferResp handleTransferFilePiece( | |
| writingFileWriter.write(req.getFilePiece()); | ||
| return PipeTransferFilePieceResp.toTPipeTransferResp( | ||
| RpcUtils.SUCCESS_STATUS, writingFileWriter.length()); | ||
| } catch (final PipeRuntimeOutOfMemoryCriticalException e) { | ||
| final TSStatus status = | ||
| getReceiverTemporaryUnavailableStatus( | ||
| "receiving pipe file piece", getFilePieceSizeInBytes(req), e); | ||
| PipeLogger.log( | ||
| LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req); | ||
| try { | ||
| return PipeTransferFilePieceResp.toTPipeTransferResp( | ||
| status, PipeTransferFilePieceResp.ERROR_END_OFFSET); | ||
| } catch (Exception ex) { | ||
| return PipeTransferFilePieceResp.toTPipeTransferResp(status); | ||
| } | ||
|
Comment on lines
+432
to
+434
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The exception is not reflected in the status?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Applied in 9213e48: the returned status now keeps the receiver OOM context and includes the root-cause exception message instead of dropping it. |
||
| } catch (final Exception e) { | ||
| PipeLogger.log( | ||
| LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req); | ||
|
|
@@ -435,6 +448,28 @@ protected final TPipeTransferResp handleTransferFilePiece( | |
| } | ||
| } | ||
|
|
||
| protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req) | ||
| throws PipeRuntimeOutOfMemoryCriticalException { | ||
| return () -> {}; | ||
| } | ||
|
|
||
| protected TSStatus getReceiverTemporaryUnavailableStatus( | ||
| final String action, | ||
| final long requestedMemorySizeInBytes, | ||
| final PipeRuntimeOutOfMemoryCriticalException e) { | ||
| return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) | ||
| .setMessage( | ||
| String.format( | ||
| PipeMessages.RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT, | ||
| action, | ||
| requestedMemorySizeInBytes, | ||
| e.getMessage())); | ||
| } | ||
|
|
||
| private static long getFilePieceSizeInBytes(final PipeTransferFilePieceReq req) { | ||
| return req.getFilePiece() == null ? 0 : req.getFilePiece().length; | ||
| } | ||
|
|
||
| protected final void updateWritingFileIfNeeded(final String fileName, final boolean isSingleFile) | ||
| throws IOException { | ||
| if (isFileExistedAndNameCorrect(fileName)) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i18n
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied in 9213e48: the receiver OOM status text now uses the DataNode pipe i18n messages with English and Chinese entries.