Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
* This class represents a replica that is waiting to be recovered.
* After a datanode restart, any replica in "rbw" directory is loaded
* as a replica waiting to be recovered.
* A replica waiting to be recovered does not provision read nor
* participates in any pipeline recovery. It will become outdated if its
* client continues to write or be recovered as a result of
* lease recovery.
* A replica waiting to be recovered provisions read access for the bytes
* validated on load, but does not participate in any pipeline recovery.
* It will become outdated if its client continues to write or be recovered
* as a result of lease recovery.
*/
public class ReplicaWaitingToBeRecovered extends LocalReplica {

Expand Down Expand Up @@ -73,7 +73,7 @@ public ReplicaState getState() {

@Override //ReplicaInfo
public long getVisibleLength() {
return -1; //no bytes are visible
return getNumBytes(); // all bytes are visible since validated on load
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,87 @@ public void testEarlierVersionEditLog() throws Exception {
cluster.shutdown();
}
}

/**
* Test that an under-construction file can be read after DataNode restart.
* This is a regression test for HDFS-17863: After hflush(), HDFS guarantees
* that written data becomes visible to readers, even while the file remains
* under construction. This guarantee must hold even after DataNode restart.
*
* The scenario is:
* 1. Client creates a file and writes data with hflush()
* 2. DataNode restarts (simulating failure/recovery)
* 3. A reader (new client) should be able to read the flushed data
*
* Before the fix, step 3 would fail with CannotObtainBlockLengthException
* because ReplicaWaitingToBeRecovered.getVisibleLength() returned -1.
*/
@Test
public void testReadUnderConstructionFileAfterDataNodeRestart()
throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
MiniDFSCluster cluster = null;
FSDataOutputStream out = null;

try {
// Use replication factor of 1 to simplify the test
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path testPath = new Path("/testReadAfterDNRestart");

// Create a file and write some data
byte[] testData = "Hello HDFS".getBytes();
out = fs.create(testPath, (short) 1);
out.write(testData);
out.hflush();

// Verify the file is readable before restart
FSDataInputStream in = fs.open(testPath);
byte[] readBuf = new byte[testData.length];
int bytesRead = in.read(readBuf);
assertEquals(testData.length, bytesRead,
"Should read all bytes before DN restart");
assertArrayEquals(testData, readBuf,
"Data should match before DN restart");
in.close();

// Restart the DataNode - this simulates a DataNode failure/recovery
// The block will be loaded as ReplicaWaitingToBeRecovered
cluster.restartDataNode(0);
cluster.waitActive();

// Wait for the DataNode to re-register with the NameNode
// and complete block report
cluster.waitFirstBRCompleted(0, 30000);

// Try to read the file again with a NEW file system instance
// This simulates a different client reading the file
// Before the fix, this would throw CannotObtainBlockLengthException
// because ReplicaWaitingToBeRecovered.getVisibleLength() returned -1
Configuration readerConf = new HdfsConfiguration();
FileSystem readerFs = FileSystem.newInstance(cluster.getURI(), readerConf);
try {
FSDataInputStream in2 = readerFs.open(testPath);
byte[] readBuf2 = new byte[testData.length];
int bytesRead2 = in2.read(readBuf2);
assertEquals(testData.length, bytesRead2,
"Should read all bytes after DN restart");
assertArrayEquals(testData, readBuf2,
"Data should match after DN restart");
in2.close();
} finally {
readerFs.close();
}
} finally {
// Close the output stream if it's still open
IOUtils.closeStream(out);
if (cluster != null) {
cluster.shutdown();
}
}
}
}