Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DataTypes/NestedUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace Nested
std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_case = false);
}

/// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple.
/// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple.
/// It can extract a column from a multiple nested type column, e.g. named Tuple in named Tuple
/// Keeps some intermediate datas to avoid rebuild them multi-times.
class NestedColumnExtractHelper
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ Chunk IRowInputFormat::generate()
if (total_rows == 0)
readPrefix();

if (is_stopped)
return {};

const Block & header = getPort().getHeader();

size_t num_columns = header.columns();
Expand Down
7 changes: 7 additions & 0 deletions src/Processors/Formats/IRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class IRowInputFormat : public IInputFormat

Chunk generate() override;

void onCancel() override
{
is_stopped = true;
}

void resetParser() override;

protected:
Expand Down Expand Up @@ -79,6 +84,8 @@ class IRowInputFormat : public IInputFormat
size_t num_errors = 0;

BlockMissingValues block_missing_values;

std::atomic<int> is_stopped{0};
};

}
2 changes: 2 additions & 0 deletions src/Storages/HDFS/StorageHDFSCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class StorageHDFSCluster : public IStorage

std::string getName() const override { return "HDFSCluster"; }

bool isRemote() const override { return true; }

Pipe read(const Names &, const StorageSnapshotPtr &, SelectQueryInfo &,
ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, unsigned /*num_streams*/) override;

Expand Down
23 changes: 19 additions & 4 deletions src/Storages/Hive/StorageHive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class StorageHiveSource : public ISource, WithContext

Chunk generate() override
{
while (true)
while (!isCancelled())
{
bool need_next_file
= (!generate_chunk_from_metadata && !reader) || (generate_chunk_from_metadata && !current_file_remained_rows);
Expand Down Expand Up @@ -296,10 +296,15 @@ class StorageHiveSource : public ISource, WithContext
return getResultChunk(source_block, num_rows);
}

reader.reset();
pipeline.reset();
read_buf.reset();
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
}
}

return {};
}

Chunk generateChunkFromMetadata()
Expand Down Expand Up @@ -401,6 +406,13 @@ class StorageHiveSource : public ISource, WithContext
return Chunk(std::move(cols), rows);
}

void onCancel() override
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}

private:
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
Expand All @@ -425,6 +437,9 @@ class StorageHiveSource : public ISource, WithContext
bool generate_chunk_from_metadata{false};
UInt64 current_file_remained_rows = 0;

/// onCancel and generate can be called concurrently
std::mutex reader_mutex;

Poco::Logger * log = &Poco::Logger::get("StorageHive");
};

Expand Down