Skip to content

Rework data loaders #3727

@westonpace

Description

@westonpace

We have a basic data loader for pytorch. However, this data loader has several limitations and is not very intuitive. There are also other training libraries (e.g. Ray Train, TensorFlow, etc.) and it would be good to ensure a consistent experience across all of them. This issue captures a lot of feedback (and we welcome more!) that we have gathered over the years in hopes of motivating a new design.

Clearly document fork-safety

Spawn is considerably slower than fork. In addition, users have often missed our guidelines on fork safety in the past and ended up frustrated with a hung process. I think #3584 will help here but there is more we could do.

  • Improve our documentation on fork safety (we should improve our documentation on data loading in general, see below)
  • Make lance fork-safe. I think this would mostly involve resetting static state (e.g. RT, LOOP) after a fork. However, this is very difficult to guarantee in general and would likely be an on-going challenge.
  • Document how to use forkserver. It is very possible to safely use forkserver in lance. One just needs to take care to ensure that lance is loaded after the forkserver is started. Improved documentation on data loading should demonstrate this.

Provide both map-based and iterator-based datasets

We currently only have an implementation of torch's iterator-based datasets. Several users have created their own map-based data loaders and these are useful in a variety of situations. However, properly implementing a map-based data loader can be tricky (e.g. need to understand row addresses and pin to version, should use slightly hidden __getitems__ method, not clear how to do shuffling with restarts, etc.)

Simplify filtering & shuffling

The filtering and shuffling in the current data loader is confusing. Pytorch's sampler is confusing. I think a much simpler model could be devised in Lance. From Lance perspective, these are the shuffling scenarios I am aware of and the approaches that should be taken.

Scenario Approach
No shuffling, No filter If there is no shuffling and there is no filter then an iterable dataset should be used for best performance
No shuffling, With filter If there is no shuffling but there is a filter then we should decide between scan (iterable dataset) and take (map dataset) based on the filter selectivity (assuming there is an index on the filter we can do this cheaply)
Shard shuffling, No filter If there is no filter and the user wants shard shuffling then an iterable dataset should be used. The shard size should be completely independent of the fragment size. There is no benefit to joining these two things and it muddies the waters. Offsets & limits should be used for sharding.
Shard shuffling, With filter If the filter is not very selective then we could still use an iterable dataset and filter after partitioning. If the filter is highly selective then falling back to a map dataset is probably easiest. Can suggest materialized view or local storage if higher performance is desired.
Row shuffling, With/without filter If row shuffling is desired then we should shuffle the (possibly filtered) row ids and run a map dataset on top of this. Consider materialized view or local storage if performance is not sufficient.

Checkpoints

Model training creates checkpoints. All of the above data loading scenarios should support creating and restarting from checkpoints. For example, when shard shuffling, we can give each shard a unique ID and save off which shards have been processed and resume the process at a later date.

This will require both documentation and features (e.g. being able to create a data loader from a partial state). The actual act of saving the checkpoint is potentially out of scope for Lance as the training library may have support for this already. However, we should clearly document how this is done in the major training libraries. Alternatively, we could offer a basic checkpoint storage mechanism.

Parallelize / batch conversion to tensors, provide more examples

The data loader currently has a to_tensor_function. However, in many cases, conversion to tensors can be a very expensive process. For example, we may be decoding images. Since the current function rows on a row-by-row basis and is not parallelized then it can easily become the bottleneck in data loading. We should make it possible to parallelize this process.

Better tracing / profiling

We should provide APIs to get basic performance information from the data loader so that it is easier to identify and fix bottlenecks. For example, after a run we could log:

  • How much time was spent waiting for I/O, what was our effective I/O bandwidth?
  • How much time was spent decoding and converting to tensor?
  • How many rows per second did the data loader emit? How many bytes per second did it emit?

Create a complete guide for data loading

This can cover all of the above as well as give performance recommendations for several scenarios. For example, we could discuss the benefits of offline processing and creating materialized views (copies) of the data if there are many epochs to run.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions