From 89d0eecfd99710529fb16eb893c762bc7deb4d4c Mon Sep 17 00:00:00 2001 From: sumangala Date: Sat, 13 Feb 2021 16:10:08 +0530 Subject: [PATCH 01/14] fix eof --- .../apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- .../apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index c1de0312151377..e883a9b0612bdf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -497,7 +497,7 @@ public synchronized void seek(long n) throws IOException { if (n < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } - if (n > contentLength) { + if (n >= contentLength) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 52abb097ef3116..205e1aed4b3f37 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.EOFException; import java.util.Arrays; import java.util.Random; @@ -33,6 +34,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test read, write and seek. @@ -85,6 +87,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { assertNotEquals(-1, result); inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); + intercept(EOFException.class, () -> inputStream.seek(2*bufferSize)); } assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); From 5f1af39228a8ace39d6d783abdaa4bd89a694e85 Mon Sep 17 00:00:00 2001 From: sumangala Date: Sat, 13 Feb 2021 17:25:10 +0530 Subject: [PATCH 02/14] formatting --- .../apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 205e1aed4b3f37..2543c75dc90233 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -87,7 +87,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { assertNotEquals(-1, result); inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); - intercept(EOFException.class, () -> inputStream.seek(2*bufferSize)); + intercept(EOFException.class, () -> inputStream.seek(2 * bufferSize)); } assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); From 799d95990b7a8d6c134818eecb6c062b83c5e40b Mon Sep 17 00:00:00 2001 From: sumangala Date: Tue, 16 Feb 2021 11:11:55 +0530 Subject: [PATCH 03/14] test --- .../fs/azurebfs/services/AbfsInputStream.java | 4 +- .../azurebfs/ITestAbfsReadWriteAndSeek.java | 20 ++++++++-- .../ITestAzureBlobFileSystemRandomRead.java | 38 ++++++++++++------- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index e883a9b0612bdf..1b68096a735d4d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -538,8 +538,8 @@ public synchronized long skip(long n) throws IOException { newPos = 0; n = newPos - currentPos; } - if (newPos > contentLength) { - newPos = contentLength; + if (newPos >= contentLength) { + newPos = contentLength - 1; n = newPos - currentPos; } seek(newPos); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 2543c75dc90233..77cc56037e80dc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs; import java.io.EOFException; +import java.io.IOException; import java.util.Arrays; import java.util.Random; @@ -72,22 +73,33 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { abfsConfiguration.setReadBufferSize(bufferSize); - final byte[] b = new byte[2 * bufferSize]; + int contentLength = 2 * bufferSize; + final byte[] b = new byte[contentLength]; new Random().nextBytes(b); try (FSDataOutputStream stream = fs.create(TEST_PATH)) { stream.write(b); } - final byte[] readBuffer = new byte[2 * bufferSize]; + final byte[] readBuffer = new byte[contentLength]; int result; try (FSDataInputStream inputStream = fs.open(TEST_PATH)) { + //seek to file mid and read until (excluding) the last byte inputStream.seek(bufferSize); - result = inputStream.read(readBuffer, bufferSize, bufferSize); + result = inputStream.read(readBuffer, bufferSize, bufferSize - 1); assertNotEquals(-1, result); + + //seek to first byte and read till file mid inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); - intercept(EOFException.class, () -> inputStream.seek(2 * bufferSize)); + //test seek EOF handling + intercept(EOFException.class, () -> inputStream.seek(contentLength)); + //seek to last valid position and read + inputStream.seek(contentLength - 1); + result = inputStream.read(readBuffer, contentLength - 1, 1); + assertNotEquals(-1, result); + //negative seek + intercept(IOException.class, () -> inputStream.seek(-1)); } assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index ef531acb2bbbca..6ac0f28aa823fb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES; @@ -203,17 +204,14 @@ public void testSkipBounds() throws Exception { assertTrue(testFileLength > 0); - skipped = inputStream.skip(testFileLength); - assertEquals(testFileLength, skipped); - intercept(EOFException.class, - new Callable() { - @Override - public Long call() throws Exception { - return inputStream.skip(1); - } - } - ); + inputStream.seek(testFileLength - 1); //last valid pos, negative skip + skipped = inputStream.skip(-testFileLength+1); + assertEquals(-testFileLength + 1, skipped); + + skipped = inputStream.skip(testFileLength); //EOF + assertEquals(testFileLength - 1, skipped); + long elapsedTimeMs = timer.elapsedTimeMs(); assertTrue( String.format( @@ -251,15 +249,15 @@ public FSDataInputStream call() throws Exception { ); assertTrue("Test file length only " + testFileLength, testFileLength > 0); - inputStream.seek(testFileLength); - assertEquals(testFileLength, inputStream.getPos()); + inputStream.seek(testFileLength - 1); + assertEquals(testFileLength - 1, inputStream.getPos()); intercept(EOFException.class, FSExceptionMessages.CANNOT_SEEK_PAST_EOF, new Callable() { @Override public FSDataInputStream call() throws Exception { - inputStream.seek(testFileLength + 1); + inputStream.seek(testFileLength); return inputStream; } } @@ -356,7 +354,7 @@ public void testSkipAndAvailableAndPosition() throws Exception { byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; - assertEquals(testFileLength, inputStream.available()); + assertEquals(testFileLength, inputStream.available()); //test at offset 0 assertEquals(0, inputStream.getPos()); int n = 3; @@ -402,6 +400,18 @@ public void testSkipAndAvailableAndPosition() throws Exception { inputStream.getPos()); assertEquals(testFileLength - inputStream.getPos(), inputStream.available()); + + skipped = inputStream.skip(testFileLength + 1); //goes to last byte + assertEquals(1, inputStream.available()); + bytesRead = inputStream.read(buffer); + assertEquals(1, bytesRead); + assertEquals(testFileLength, inputStream.getPos()); + + byte[] buffer2 = new byte[DEFAULT_READ_BUFFER_SIZE + 10]; + inputStream.seek(0); + bytesRead = inputStream.read(buffer2); + assertEquals(buffer2.length, bytesRead); + assertEquals(testFileLength - buffer2.length, inputStream.available()); } } From a4778625ab4513130df0ba6ed5f4ea49bc2a44ba Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 17 Feb 2021 11:25:08 +0530 Subject: [PATCH 04/14] seek to 0 for empty file --- .../org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 1b68096a735d4d..f68fd25fafb7de 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -497,7 +497,7 @@ public synchronized void seek(long n) throws IOException { if (n < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } - if (n >= contentLength) { + if (n > 0 && n >= contentLength) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } From eafcccf2d8df0b1e17c7c80064e171ee2bc2fb18 Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 17 Feb 2021 18:45:23 +0530 Subject: [PATCH 05/14] seek stats fix --- .../fs/azurebfs/services/AbfsInputStream.java | 4 +++- .../AbfsInputStreamStatisticsImpl.java | 2 ++ .../ITestAbfsInputStreamStatistics.java | 21 +++++++++++-------- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index f68fd25fafb7de..17798804267901 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -76,7 +76,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private CachedSASToken cachedSasToken; private byte[] buffer = null; // will be initialized on first use - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server + public long fCursor = 0; // cursor of buffer within file - offset of next byte + // to read from remote server private long fCursorAfterLastRead = -1; private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 @@ -498,6 +499,7 @@ public synchronized void seek(long n) throws IOException { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } if (n > 0 && n >= contentLength) { +// if (n > contentLength) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java index bd09762976d7f8..90cdb280be2b7f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -102,8 +102,10 @@ public void seekForwards(long skipped) { public void seek(long seekTo, long currentPos) { if (seekTo >= currentPos) { this.seekForwards(seekTo - currentPos); +// System.out.println(seekTo + " " + currentPos + " fwd"); } else { this.seekBackwards(currentPos - seekTo); +// System.out.println(seekTo + " " + currentPos + " bwd"); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index a33a76ecefe776..79a4705b9f4809 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -42,6 +42,7 @@ public class ITestAbfsInputStreamStatistics private static final int ONE_KB = 1024; private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024; private byte[] defBuffer = new byte[ONE_MB]; + private byte[] defBuffer2 = new byte[ONE_MB * 5]; public ITestAbfsInputStreamStatistics() throws Exception { } @@ -104,24 +105,24 @@ public void testSeekStatistics() throws IOException { out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath); //Writing a default buffer in a file. - out.write(defBuffer); + out.write(defBuffer2); out.hflush(); in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); /* - * Writing 1MB buffer to the file, this would make the fCursor(Current - * position of cursor) to the end of file. + * Writing 4MB buffer to the file */ - int result = in.read(defBuffer, 0, ONE_MB); + int result = in.read(defBuffer2, 0, ONE_MB * 4); +// System.out.println(in.fCursor + " " + in.length() + in.available()); LOG.info("Result of read : {}", result); /* - * Seeking to start of file and then back to end would result in a - * backward and a forward seek respectively 10 times. + * Seeking to start of file and then back to 4MB position would result + * in a backward and a forward seek respectively 10 times. */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); - in.seek(ONE_MB); + in.seek(ONE_MB * 4); } AbfsInputStreamStatisticsImpl stats = @@ -152,6 +153,8 @@ public void testSeekStatistics() throws IOException { * would be equal to 2 * OPERATIONS. * */ +// System.out.println(stats.getSeekOperations() + " " + +// stats.getForwardSeekOperations() + " " + stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, stats.getSeekOperations()); assertEquals("Mismatch in backwardSeekOps value", OPERATIONS, @@ -159,7 +162,7 @@ public void testSeekStatistics() throws IOException { assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, stats.getForwardSeekOperations()); assertEquals("Mismatch in bytesBackwardsOnSeek value", - OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); + OPERATIONS * ONE_MB * 4, stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, @@ -279,7 +282,7 @@ public void testWithNullStreamStatistics() throws IOException { // Verifying that AbfsInputStream Operations works with null statistics. assertNotEquals("AbfsInputStream read() with null statistics should " + "work", -1, in.read()); - in.seek(ONE_KB); + in.seek(ONE_KB - 1); // Verifying toString() with no StreamStatistics. LOG.info("AbfsInputStream: {}", in.toString()); From f2bc31020afe368555efd61731d46f2ada5599aa Mon Sep 17 00:00:00 2001 From: sumangala Date: Wed, 17 Feb 2021 22:14:38 +0530 Subject: [PATCH 06/14] modify read buf sz fwd seek --- .../fs/azurebfs/services/AbfsInputStream.java | 1 - .../AbfsInputStreamStatisticsImpl.java | 2 -- .../ITestAbfsInputStreamStatistics.java | 28 +++++++++---------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 17798804267901..01c2eb6a17495d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -499,7 +499,6 @@ public synchronized void seek(long n) throws IOException { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } if (n > 0 && n >= contentLength) { -// if (n > contentLength) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java index 90cdb280be2b7f..bd09762976d7f8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -102,10 +102,8 @@ public void seekForwards(long skipped) { public void seek(long seekTo, long currentPos) { if (seekTo >= currentPos) { this.seekForwards(seekTo - currentPos); -// System.out.println(seekTo + " " + currentPos + " fwd"); } else { this.seekBackwards(currentPos - seekTo); -// System.out.println(seekTo + " " + currentPos + " bwd"); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 79a4705b9f4809..9bab162709e036 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -42,7 +42,6 @@ public class ITestAbfsInputStreamStatistics private static final int ONE_KB = 1024; private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024; private byte[] defBuffer = new byte[ONE_MB]; - private byte[] defBuffer2 = new byte[ONE_MB * 5]; public ITestAbfsInputStreamStatistics() throws Exception { } @@ -101,28 +100,31 @@ public void testSeekStatistics() throws IOException { AbfsOutputStream out = null; AbfsInputStream in = null; + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] buf = new byte[readBufferSize + 1]; + try { out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath); - //Writing a default buffer in a file. - out.write(defBuffer2); + //Writing buffer to file + out.write(buf); out.hflush(); in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); /* - * Writing 4MB buffer to the file + * Reading buffer size from file. After read, fCursor = readBufferSize + * Last valid offset in file is (readBufferSize - 1) */ - int result = in.read(defBuffer2, 0, ONE_MB * 4); -// System.out.println(in.fCursor + " " + in.length() + in.available()); + int result = in.read(buf, 0, readBufferSize); LOG.info("Result of read : {}", result); /* - * Seeking to start of file and then back to 4MB position would result - * in a backward and a forward seek respectively 10 times. + * Seeking to start of file and then back to readBufferSize position would + * result in a backward and a forward seek respectively 10 times. */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); - in.seek(ONE_MB * 4); + in.seek(readBufferSize); } AbfsInputStreamStatisticsImpl stats = @@ -141,8 +143,8 @@ public void testSeekStatistics() throws IOException { * for OPERATION times, total forward seeks would be OPERATIONS. * * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from - * end of file in a ONE_MB file each time, this would mean the bytes from - * backward seek would be OPERATIONS * ONE_MB. + * end of file in a readBufferSize file each time, this would mean the bytes from + * backward seek would be OPERATIONS * readBufferSize. * * bytesSkippedOnSeek - Since, we move from start to end in seek, but * our fCursor(position of cursor) always remain at end of file, this @@ -153,8 +155,6 @@ public void testSeekStatistics() throws IOException { * would be equal to 2 * OPERATIONS. * */ -// System.out.println(stats.getSeekOperations() + " " + -// stats.getForwardSeekOperations() + " " + stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, stats.getSeekOperations()); assertEquals("Mismatch in backwardSeekOps value", OPERATIONS, @@ -162,7 +162,7 @@ public void testSeekStatistics() throws IOException { assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, stats.getForwardSeekOperations()); assertEquals("Mismatch in bytesBackwardsOnSeek value", - OPERATIONS * ONE_MB * 4, stats.getBytesBackwardsOnSeek()); + OPERATIONS * readBufferSize, stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, From 895be01c34a11cf1e6802a271003729dd7e9bcdb Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 18 Feb 2021 11:12:56 +0530 Subject: [PATCH 07/14] checkstyle --- .../hadoop/fs/azurebfs/services/AbfsInputStream.java | 3 +-- .../fs/azurebfs/ITestAbfsInputStreamStatistics.java | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fa4b85e0f75335..fc07a148a9164a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -84,8 +84,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private CachedSASToken cachedSasToken; private byte[] buffer = null; // will be initialized on first use - public long fCursor = 0; // cursor of buffer within file - offset of next byte - // to read from remote server + private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server private long fCursorAfterLastRead = -1; private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 9bab162709e036..16d0901eb52bd1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -112,8 +112,8 @@ public void testSeekStatistics() throws IOException { in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); /* - * Reading buffer size from file. After read, fCursor = readBufferSize - * Last valid offset in file is (readBufferSize - 1) + * Reading from file. After read, fCursor = readBufferSize + * Last valid offset in file is readBufferSize */ int result = in.read(buf, 0, readBufferSize); LOG.info("Result of read : {}", result); @@ -143,8 +143,8 @@ public void testSeekStatistics() throws IOException { * for OPERATION times, total forward seeks would be OPERATIONS. * * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from - * end of file in a readBufferSize file each time, this would mean the bytes from - * backward seek would be OPERATIONS * readBufferSize. + * last byte of file over readBufferSize bytes each time, this would mean + * the bytes from backward seek would be OPERATIONS * readBufferSize. * * bytesSkippedOnSeek - Since, we move from start to end in seek, but * our fCursor(position of cursor) always remain at end of file, this From c4331e814306732d7f70ac092c350b9253db6c85 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 18 Feb 2021 15:00:40 +0530 Subject: [PATCH 08/14] test --- .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index 6ac0f28aa823fb..4defea9bde307f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES; @@ -407,7 +406,7 @@ public void testSkipAndAvailableAndPosition() throws Exception { assertEquals(1, bytesRead); assertEquals(testFileLength, inputStream.getPos()); - byte[] buffer2 = new byte[DEFAULT_READ_BUFFER_SIZE + 10]; + byte[] buffer2 = new byte[getConfiguration().getReadBufferSize() + 10]; inputStream.seek(0); bytesRead = inputStream.read(buffer2); assertEquals(buffer2.length, bytesRead); From 213efddc76042a1fb4102b249452b8a1f381beff Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 18 Feb 2021 22:38:43 +0530 Subject: [PATCH 09/14] skip fix --- .../fs/azurebfs/services/AbfsInputStream.java | 2 +- .../azurebfs/ITestAbfsReadWriteAndSeek.java | 3 +- .../ITestAzureBlobFileSystemRandomRead.java | 30 ++++++++++++------- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fc07a148a9164a..dd0aecef618ff1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -583,7 +583,7 @@ public synchronized long skip(long n) throws IOException { newPos = 0; n = newPos - currentPos; } - if (newPos >= contentLength) { + if (newPos > 0 && newPos >= contentLength) { newPos = contentLength - 1; n = newPos - currentPos; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 77cc56037e80dc..27a953b0f51e3b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -88,7 +88,6 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(bufferSize); result = inputStream.read(readBuffer, bufferSize, bufferSize - 1); assertNotEquals(-1, result); - //seek to first byte and read till file mid inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); @@ -97,7 +96,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { //seek to last valid position and read inputStream.seek(contentLength - 1); result = inputStream.read(readBuffer, contentLength - 1, 1); - assertNotEquals(-1, result); + assertNotEquals("Read should succeed for last byte", -1, result); //negative seek intercept(IOException.class, () -> inputStream.seek(-1)); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index 4defea9bde307f..6513d17678777c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -353,7 +353,7 @@ public void testSkipAndAvailableAndPosition() throws Exception { byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; - assertEquals(testFileLength, inputStream.available()); //test at offset 0 + assertEquals(testFileLength, inputStream.available()); assertEquals(0, inputStream.getPos()); int n = 3; @@ -401,19 +401,29 @@ public void testSkipAndAvailableAndPosition() throws Exception { inputStream.available()); skipped = inputStream.skip(testFileLength + 1); //goes to last byte - assertEquals(1, inputStream.available()); + assertEquals("One byte should be available after skip to EOF", 1, + inputStream.available()); bytesRead = inputStream.read(buffer); - assertEquals(1, bytesRead); - assertEquals(testFileLength, inputStream.getPos()); - - byte[] buffer2 = new byte[getConfiguration().getReadBufferSize() + 10]; - inputStream.seek(0); - bytesRead = inputStream.read(buffer2); - assertEquals(buffer2.length, bytesRead); - assertEquals(testFileLength - buffer2.length, inputStream.available()); + assertEquals("Incorrect read byte count", 1, bytesRead); + assertEquals("Incorrect position post read", testFileLength, + inputStream.getPos()); } } + @Test + public void testZeroByteFile() throws IOException { + Path emptyFile = new Path("/emptyFile"); + getFileSystem().create(emptyFile); + FSDataInputStream in = getFileSystem().open(emptyFile); + assertEquals("Initial position of inputstream in empty fils 0", 0, + in.getPos()); + in.seek(0); + assertEquals("Seek to 0 should succeed", 0, in.getPos()); + in.skip(0); + assertEquals("Skip 0 should succeed", 0, in.getPos()); + assertEquals("Available bytes in empty file is 0", 0, in.available()); + } + /** * Ensures parity in the performance of sequential read after reverse seek for * abfs of the AbfsInputStream. From 010540068c3b9af8e6f3f265094202cfe52169c5 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 18 Feb 2021 22:51:02 +0530 Subject: [PATCH 10/14] assert msg --- .../fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index 6513d17678777c..c17c0908d57ae1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -206,10 +206,10 @@ public void testSkipBounds() throws Exception { inputStream.seek(testFileLength - 1); //last valid pos, negative skip skipped = inputStream.skip(-testFileLength+1); - assertEquals(-testFileLength + 1, skipped); + assertEquals("Incorrect skip count", -testFileLength + 1, skipped); skipped = inputStream.skip(testFileLength); //EOF - assertEquals(testFileLength - 1, skipped); + assertEquals("Incorrect skip count", testFileLength - 1, skipped); long elapsedTimeMs = timer.elapsedTimeMs(); assertTrue( From 4ce816bde6053356dd113c4131b0f25271a98d7f Mon Sep 17 00:00:00 2001 From: sumangala Date: Sat, 20 Feb 2021 20:20:24 +0530 Subject: [PATCH 11/14] complete tests --- .../azurebfs/ITestAbfsReadWriteAndSeek.java | 2 +- .../ITestAzureBlobFileSystemRandomRead.java | 46 +++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 27a953b0f51e3b..b76c22311d2598 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -91,7 +91,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { //seek to first byte and read till file mid inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); - //test seek EOF handling + //test seek beyond EOF handling intercept(EOFException.class, () -> inputStream.seek(contentLength)); //seek to last valid position and read inputStream.seek(contentLength - 1); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index c17c0908d57ae1..f2bd37059a6f71 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -203,20 +203,34 @@ public void testSkipBounds() throws Exception { assertTrue(testFileLength > 0); + //test skip to EOF with correct input skip count + assertEquals("Position should be 0", 0, inputStream.getPos()); + inputStream.skip(testFileLength - 1); + assertEquals("Position should be EOF", testFileLength - 1, + inputStream.getPos()); + + long elapsedTimeMs = timer.elapsedTimeMs(); + assertTrue( + String.format( + "There should not be any network I/O (elapsedTimeMs=%1$d).", + elapsedTimeMs), + elapsedTimeMs < MAX_ELAPSEDTIMEMS); - inputStream.seek(testFileLength - 1); //last valid pos, negative skip + //test negative skip from last valid position skipped = inputStream.skip(-testFileLength+1); assertEquals("Incorrect skip count", -testFileLength + 1, skipped); + assertEquals("Position should be 0", 0, inputStream.getPos()); - skipped = inputStream.skip(testFileLength); //EOF + //test large positive skip from position 0 beyond EOF + skipped = inputStream.skip(testFileLength); assertEquals("Incorrect skip count", testFileLength - 1, skipped); - long elapsedTimeMs = timer.elapsedTimeMs(); - assertTrue( - String.format( - "There should not be any network I/O (elapsedTimeMs=%1$d).", - elapsedTimeMs), - elapsedTimeMs < MAX_ELAPSEDTIMEMS); + //test positive skip from contentlength postion (EOF + 1) + inputStream.read(); //read 1 byte from EOF + assertEquals("Position should be testFileLength", testFileLength, + inputStream.getPos()); + intercept(EOFException.class, FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + () -> inputStream.skip(1)); } } @@ -400,18 +414,14 @@ public void testSkipAndAvailableAndPosition() throws Exception { assertEquals(testFileLength - inputStream.getPos(), inputStream.available()); - skipped = inputStream.skip(testFileLength + 1); //goes to last byte + inputStream.skip(testFileLength + 1); //goes to last byte assertEquals("One byte should be available after skip to EOF", 1, inputStream.available()); - bytesRead = inputStream.read(buffer); - assertEquals("Incorrect read byte count", 1, bytesRead); - assertEquals("Incorrect position post read", testFileLength, - inputStream.getPos()); } } @Test - public void testZeroByteFile() throws IOException { + public void testZeroByteFile() throws Exception { Path emptyFile = new Path("/emptyFile"); getFileSystem().create(emptyFile); FSDataInputStream in = getFileSystem().open(emptyFile); @@ -422,6 +432,14 @@ public void testZeroByteFile() throws IOException { in.skip(0); assertEquals("Skip 0 should succeed", 0, in.getPos()); assertEquals("Available bytes in empty file is 0", 0, in.available()); + + intercept(EOFException.class, () -> in.seek(1)); + intercept(EOFException.class, () -> in.seek(-1)); + //skip(1) from position 0 does not seek(0) since pos = contentlength + intercept(EOFException.class, () -> in.skip(1)); + in.skip(-1); + assertEquals("Should seek to 0", 0, in.getPos()); + } /** From 552ed409ebf0d9f1a1e5d7e71ab29d2b52819f37 Mon Sep 17 00:00:00 2001 From: sumangala Date: Sat, 20 Feb 2021 20:31:52 +0530 Subject: [PATCH 12/14] minor edit --- .../fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index f2bd37059a6f71..41152b89d02c27 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -224,6 +224,8 @@ public void testSkipBounds() throws Exception { //test large positive skip from position 0 beyond EOF skipped = inputStream.skip(testFileLength); assertEquals("Incorrect skip count", testFileLength - 1, skipped); + assertEquals("One byte should be available after skip to EOF", 1, + inputStream.available()); //test positive skip from contentlength postion (EOF + 1) inputStream.read(); //read 1 byte from EOF @@ -413,10 +415,6 @@ public void testSkipAndAvailableAndPosition() throws Exception { inputStream.getPos()); assertEquals(testFileLength - inputStream.getPos(), inputStream.available()); - - inputStream.skip(testFileLength + 1); //goes to last byte - assertEquals("One byte should be available after skip to EOF", 1, - inputStream.available()); } } @@ -425,7 +423,7 @@ public void testZeroByteFile() throws Exception { Path emptyFile = new Path("/emptyFile"); getFileSystem().create(emptyFile); FSDataInputStream in = getFileSystem().open(emptyFile); - assertEquals("Initial position of inputstream in empty fils 0", 0, + assertEquals("Initial position of inputstream in empty file is 0", 0, in.getPos()); in.seek(0); assertEquals("Seek to 0 should succeed", 0, in.getPos()); @@ -439,7 +437,6 @@ public void testZeroByteFile() throws Exception { intercept(EOFException.class, () -> in.skip(1)); in.skip(-1); assertEquals("Should seek to 0", 0, in.getPos()); - } /** From 338364333ead66b64082a9f7b2b635eb871c354f Mon Sep 17 00:00:00 2001 From: sumangala Date: Tue, 23 Feb 2021 22:06:53 +0530 Subject: [PATCH 13/14] revw --- .../azurebfs/ITestAzureBlobFileSystemRandomRead.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index 41152b89d02c27..c91c15c937a75c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -217,7 +217,7 @@ public void testSkipBounds() throws Exception { elapsedTimeMs < MAX_ELAPSEDTIMEMS); //test negative skip from last valid position - skipped = inputStream.skip(-testFileLength+1); + skipped = inputStream.skip(-testFileLength + 1); assertEquals("Incorrect skip count", -testFileLength + 1, skipped); assertEquals("Position should be 0", 0, inputStream.getPos()); @@ -427,15 +427,17 @@ public void testZeroByteFile() throws Exception { in.getPos()); in.seek(0); assertEquals("Seek to 0 should succeed", 0, in.getPos()); - in.skip(0); - assertEquals("Skip 0 should succeed", 0, in.getPos()); + long skipped = in.skip(0); + assertEquals("Number of skipped bytes should be 0", 0, skipped); + assertEquals("Position should be 0 post skip 0", 0, in.getPos()); assertEquals("Available bytes in empty file is 0", 0, in.available()); intercept(EOFException.class, () -> in.seek(1)); intercept(EOFException.class, () -> in.seek(-1)); //skip(1) from position 0 does not seek(0) since pos = contentlength intercept(EOFException.class, () -> in.skip(1)); - in.skip(-1); + skipped = in.skip(-1); + assertEquals("Number of skipped bytes should be 0", 0, skipped); assertEquals("Should seek to 0", 0, in.getPos()); } From 0762ee10bb7f166cc8488f84e11562a3262f0640 Mon Sep 17 00:00:00 2001 From: sumangala Date: Thu, 25 Feb 2021 11:04:19 +0530 Subject: [PATCH 14/14] seek stats diff --- .../ITestAbfsInputStreamStatistics.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 16d0901eb52bd1..8289cd299ded80 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.io.IOUtils; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; + public class ITestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest { private static final int OPERATIONS = 10; @@ -89,42 +92,41 @@ public void testInitValues() throws IOException { * Test to check statistics from seek operation in AbfsInputStream. */ @Test - public void testSeekStatistics() throws IOException { + public void testSeekStatistics() throws Exception { describe("Testing the values of statistics from seek operations in " + "AbfsInputStream"); - AzureBlobFileSystem fs = getFileSystem(); + Configuration config = getRawConfiguration(); + config.set(AZURE_READ_BUFFER_SIZE, String.valueOf(ONE_MB - 1)); + AzureBlobFileSystem fs = getFileSystem(config); AzureBlobFileSystemStore abfss = fs.getAbfsStore(); Path seekStatPath = path(getMethodName()); AbfsOutputStream out = null; AbfsInputStream in = null; - int readBufferSize = getConfiguration().getReadBufferSize(); - byte[] buf = new byte[readBufferSize + 1]; - try { out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath); - //Writing buffer to file - out.write(buf); + //Writing a default buffer in a file. + out.write(defBuffer); out.hflush(); in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); /* - * Reading from file. After read, fCursor = readBufferSize - * Last valid offset in file is readBufferSize + * Writing 1MB buffer to the file, this would make the fCursor(Current + * position of cursor) to the end of file. */ - int result = in.read(buf, 0, readBufferSize); + int result = in.read(defBuffer, 0, ONE_MB - 1); LOG.info("Result of read : {}", result); /* - * Seeking to start of file and then back to readBufferSize position would - * result in a backward and a forward seek respectively 10 times. + * Seeking to start of file and then back to end would result in a + * backward and a forward seek respectively 10 times. */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); - in.seek(readBufferSize); + in.seek(ONE_MB - 1); } AbfsInputStreamStatisticsImpl stats = @@ -143,8 +145,8 @@ public void testSeekStatistics() throws IOException { * for OPERATION times, total forward seeks would be OPERATIONS. * * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from - * last byte of file over readBufferSize bytes each time, this would mean - * the bytes from backward seek would be OPERATIONS * readBufferSize. + * end of file in a ONE_MB file each time, this would mean the bytes from + * backward seek would be OPERATIONS * ONE_MB. * * bytesSkippedOnSeek - Since, we move from start to end in seek, but * our fCursor(position of cursor) always remain at end of file, this @@ -162,7 +164,7 @@ public void testSeekStatistics() throws IOException { assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, stats.getForwardSeekOperations()); assertEquals("Mismatch in bytesBackwardsOnSeek value", - OPERATIONS * readBufferSize, stats.getBytesBackwardsOnSeek()); + OPERATIONS * (ONE_MB - 1), stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,