HADOOP-1593. [ABFS] Add vectored read support in ABFS driver #8400
HADOOP-1593. [ABFS] Add vectored read support in ABFS driver #8400anmolanmol1234 wants to merge 35 commits intoapache:trunkfrom
Conversation
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
| return System.identityHashCode(range); | ||
| } | ||
| } | ||
| } No newline at end of file |
| @@ -1078,7 +1105,7 @@ public int minSeekForVectorReads() { | |||
| */ | |||
| @Override | |||
| public int maxReadSizeForVectorReads() { | |||
There was a problem hiding this comment.
the method mentions the read size but we're returning the max gap size
There was a problem hiding this comment.
this is the name of the method in the superclass
There was a problem hiding this comment.
But why are we returning the seek size here? Why not actual read size?
| buffer.getPath(), r.getOffset(), destOffset, length, left); | ||
|
|
||
| if (left < 0) { | ||
| LOG.error("fanOut: pending bytes went negative possible duplicate write:" |
There was a problem hiding this comment.
nit: non-printable character
| if (end >= unit.getOffset() + unit.getLength()) { | ||
| existing.setBufferType(BufferType.VECTORED); | ||
| existing.addVectoredUnit(unit); | ||
| existing.setAllocator(allocator); |
There was a problem hiding this comment.
could we have multiple readvector calls with overlapping ranges that could reset the allocator here? Or isAlreadyQueued section wont go through for overlapping ranges?
There was a problem hiding this comment.
Overlapping ranges are not allowed in vectored reads. validateAndSortRanges in VectoredReadUtils class takes care of this.
| return bufferManager; | ||
| } | ||
|
|
||
| VectoredReadHandler getVectoredReadHandler() { |
| long bufferEnd = bufferStart + bytesRead; | ||
|
|
||
| /* Iterate over all combined logical units mapped to this buffer */ | ||
| for (CombinedFileRange unit : units) { |
There was a problem hiding this comment.
could the following scenario be possible: while we fan-out here, we have another vectorRead call come in overlapping the ranges shared by this buffer and getting attached to the same buffer as a unit
There was a problem hiding this comment.
Overlapping ranges are not allowed, List<? extends FileRange> sortedRanges = VectoredReadUtils.validateAndSortRanges(
ranges, Optional.of(fileLength)); this takes care of that
| */ | ||
| if (isAlreadyQueued(stream.getETag(), unit.getOffset())) { | ||
| ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset()); | ||
| if (existing != null && existing.getStream().getETag() != null && stream.getETag() |
There was a problem hiding this comment.
same doubt as RBMV1, do we wait for UNAVAILABLE state readbuffers too?
There was a problem hiding this comment.
yes addressed above
| * @param abfsConfiguration the configuration to set for the ReadBufferManagerV2. | ||
| */ | ||
| public static void setReadBufferManagerConfigs(final int readAheadBlockSize, | ||
| public static void setReadBufferManagerConfigs(int readAheadBlockSize, |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
| return isFirstByteConsumed() && isLastByteConsumed(); | ||
| } | ||
|
|
||
| void addVectoredUnit(CombinedFileRange u) { |
There was a problem hiding this comment.
Java doc for all the newly created methods
| // Allocator used for vectored fan-out; captured at queue time */ | ||
| private IntFunction<ByteBuffer> allocator; | ||
| // Tracks whether fanOut has already been executed | ||
| private final AtomicInteger fanOutDone = new AtomicInteger(0); |
There was a problem hiding this comment.
Would it be better to keep fanOutDone as AtomicBoolean instead of AtomicInteger? We don't have to compare the value in isFanOutDone() in that case.
| ReadBuffer findInList(final Collection<ReadBuffer> buffers, | ||
| final AbfsInputStream stream, long requestedOffset) { | ||
| for (ReadBuffer buffer : buffers) { | ||
| if (buffer.getStream() == stream |
There was a problem hiding this comment.
can buffer be null, if yes it will result into null pointer exception here.
There was a problem hiding this comment.
No buffer can't be null here
|
@steveloughran @mukund-thakur requesting you to kindly review the PR. Thanks |
|
🎊 +1 overall
This message was automatically generated. |
steveloughran
left a comment
There was a problem hiding this comment.
have a look at #7105 / HADOOP-19105. Improve resilience in vector reads. https://issues.apache.org/jira/browse/HADOOP-19105
to see why I now think trying to merge ranges is a PITA.
do make sure that your error handling explicitly releases any allocated buffers before raising exceptions; the PositionedReadable interface was extended to support this
| @@ -1184,7 +1209,7 @@ ReadBufferManager getReadBufferManager() { | |||
| */ | |||
| @Override | |||
| public int minSeekForVectorReads() { | |||
There was a problem hiding this comment.
I'm actually going to recommend not coalescing vectors unless you have a good strategy to deal with partial failures, memory releases, retries etc.
I did try to do that in s3a and gave up, even after extending the api to allow a "release" operation to be passed (which parquet passes down FWIW).
We've never seen failures with s3 in production and coalesced ranges as parquet/orc rowgroups are too far apart. So it's better to focus on retry and recovery there than range coalescing
There was a problem hiding this comment.
In case of Azure, we do 4 MB buffer reads, so if don't do coalescing, we are wasting a lot of read data per range and hence we found that merging ranges which fall in one buffer i.e. 4 MB is giving better results
| * Performs a vectored direct read by fetching multiple non-contiguous | ||
| * ranges in a single operation. | ||
| */ | ||
| VECTORED_DIRECT_READ("VDR"), |
There was a problem hiding this comment.
azure does multirange? nice
There was a problem hiding this comment.
Here this direct read is when we are not able to queue a vectored read in the read ahead queue due to read ahead buffers not being available, we do a direct readRemote call
Have tried to take care of this in the VectoredReadHandler fanOut and directRead methods, can you please check if you see any concerns with the code there. |
| @@ -1078,7 +1105,7 @@ public int minSeekForVectorReads() { | |||
| */ | |||
| @Override | |||
| public int maxReadSizeForVectorReads() { | |||
There was a problem hiding this comment.
But why are we returning the seek size here? Why not actual read size?
|
🎊 +1 overall
This message was automatically generated. |
|
Hi @mukund-thakur gentle reminder for reviewing the PR |
Hi @anmolanmol1234 Thanks for implementing this in ABFS. I skimmed over the changes quickly and I think it is fairly complex thus would be better if someone from ABFS team review and commit this. I haven't touched this code in long time. I would be happy to answer interface related questions. Thanks. |
| @Override | ||
| public int maxReadSizeForVectorReads() { | ||
| return S_2M; | ||
| return client.getAbfsConfiguration().getMaxSeekForVectoredReads(); |
There was a problem hiding this comment.
It should be in lines of maxReadSizeForVectorReads
| * Configuration key that defines the maximum gap between adjacent read ranges | ||
| * for merging ranges during vectored reads in ABFS: {@value}. | ||
| */ | ||
| public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS = |
There was a problem hiding this comment.
It would be good if you use consistent parameter names.
fs.s3a.vectored.read.max.merged.size is the name in S3A.
| import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO; | ||
| import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; | ||
|
|
||
| public class ITestVectoredRead extends AbstractAbfsIntegrationTest { |
There was a problem hiding this comment.
Please make sure to run ABFS contract tests ITestAbfsFileSystemContractVectoredRead
There was a problem hiding this comment.
yes all tests are passing here
|
Hi @steveloughran have addressed your concerns, can you take a look once. Thanks |
|
🎊 +1 overall
This message was automatically generated. |
This PR introduces vectored read support in the Azure Blob File System (ABFS) driver to improve read performance for workloads that issue multiple small, non-contiguous read requests.
Vectored reads enable batching of multiple read ranges into fewer network calls, reducing request overhead and improving throughput—especially beneficial for analytics engines like Spark.
Current ABFS read implementation performs sequential, independent read operations for each requested range. This leads to:
Increased number of network calls
Higher latency for small/random reads
Inefficient utilization of bandwidth
Vectored I/O addresses these issues by coalescing multiple read requests into a single or fewer backend calls.