feat(server): Add multipart upload endpoints#463
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
✅ Bugbot reviewed your changes and found no new issues!
Comment @cursor review or bugbot run to trigger another review on this PR
Reviewed by Cursor Bugbot for commit 5a3cfa2. Configure here.
| /// | ||
| /// The default returns `None`. Backends that implement | ||
| /// [`MultipartUploadBackend`] should override this to return `Some(self)`. | ||
| fn as_multipart_upload_backend(&self) -> Option<&dyn MultipartUploadBackend> { |
There was a problem hiding this comment.
Not sure if this is the best approach.
Backend should not know about MultipartUploadBackend, and this means that the server errors with HTTP status NOT IMPLEMENTED when the backend doesn't support this capability.
As an alternative, we could make StorageService store a Arc<dyn MultipartUploadBackend>.
That however would prevent someone from using Objectstore with a Bigtable only configuration, which I'm not sure if we consider a legitimate usecase or not.
Regardless, we can easily make that change at any point.
| headers: HeaderMap, | ||
| ) -> ApiResult<Response> { | ||
| let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; | ||
| // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. |
There was a problem hiding this comment.
This is easy to do but we would need a new method on Backend, plus all the implementations and plumbing through Service, filed a follow-up: https://linear.app/getsentry/issue/FS-356/set-correct-metadatatime-created-for-multipart-uploads.
Or, we can just accept and document that time_created refers to the time the upload was initiated, but that sounds wrong, it should probably be the time the logical object is created, which is the time the multipart upload is completed.
| max_parts: Option<u32>, | ||
| part_number_marker: Option<PartNumber>, | ||
| ) -> ApiResult<ListPartsResponse> { | ||
| self.assert_authorized(Permission::ObjectRead, id.context())?; |
There was a problem hiding this comment.
Maybe this should also require Permission::ObjectWrite instead, to match the rest of the methods.
But also, if a user has ObjectWrite they would usually also have ObjectRead in any realistic scenario.
There was a problem hiding this comment.
not sure what to do here. GCS just gives listing its own permission
if a user has ObjectWrite they would usually also have ObjectRead in any realistic scenario
that might be how we assign permissions today, but i think probably most tokens only need to be read or write.
if the point is to allow an interrupted write to continue, maybe write permissions should allow it? so a write-only client doesn't need to request additional permissions. idk
409be20 to
d6eb979
Compare
5babbfb to
c33ba6f
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## lcian/feat/multipart-upload-tiered #463 +/- ##
======================================================================
+ Coverage 86.95% 87.16% +0.20%
======================================================================
Files 77 78 +1
Lines 11685 12032 +347
======================================================================
+ Hits 10161 10488 +327
- Misses 1524 1544 +20
☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| max_parts: Option<u32>, | ||
| part_number_marker: Option<PartNumber>, | ||
| ) -> ApiResult<ListPartsResponse> { | ||
| self.assert_authorized(Permission::ObjectRead, id.context())?; |
There was a problem hiding this comment.
not sure what to do here. GCS just gives listing its own permission
if a user has ObjectWrite they would usually also have ObjectRead in any realistic scenario
that might be how we assign permissions today, but i think probably most tokens only need to be read or write.
if the point is to allow an interrupted write to continue, maybe write permissions should allow it? so a write-only client doesn't need to request additional permissions. idk
| objectstore_log::error!(!!self, "error handling request"); | ||
| StatusCode::INTERNAL_SERVER_ERROR | ||
| } | ||
|
|
||
| ApiError::Internal(_) => { | ||
| objectstore_log::error!(!!self, "internal error"); |
There was a problem hiding this comment.
are we dropping important errors from these logs?
| Self::Panic(_) => Level::ERROR, | ||
| Self::Dropped => Level::ERROR, | ||
| Self::UnexpectedTombstone => Level::ERROR, | ||
| Self::NotImplemented => Level::WARN, |
There was a problem hiding this comment.
this is probably an error? if this ever happens in a deployment it means something was misconfigured
| fn ensure_inner_multipart(&self) -> Result<()> { | ||
| self.inner | ||
| .as_multipart_upload_backend() | ||
| .ok_or(Error::NotImplemented)?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Initiates a new multipart upload. | ||
| pub async fn initiate_multipart( | ||
| &self, | ||
| id: ObjectId, | ||
| metadata: Metadata, | ||
| ) -> Result<InitiateMultipartResponse> { | ||
| self.ensure_inner_multipart()?; | ||
| let inner = Arc::clone(&self.inner); | ||
| self.spawn("initiate_multipart", async move { | ||
| inner | ||
| .as_multipart_upload_backend() | ||
| .unwrap() | ||
| .initiate_multipart(&id, &metadata) | ||
| .await | ||
| }) | ||
| .await | ||
| } |
There was a problem hiding this comment.
there's a way to streamline this a bit to get rid of the ensure_... helper and .unwrap() calls:
pub trait Backend {
/// Make this an explicit `Arc<dyn T>` -> `Arc<dyn U>` cast.
/// Return `Err(Error::NotImplemented)` by default.
fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
Err(Error::NotImplemented)
}
}
impl Backend for GcsBackend {
/// Convert `Arc<dyn Backend>` into `Arc<dyn MultipartUploadBackend>`
fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
Ok(self)
}
}
impl StorageService {
pub async fn initiate_multipart(...) -> Result<InitiateMultipartResponse> {
// Clone `inner` so it can be `move`d into the async block, as before.
// `.as_multipart_upload_backend()` transforms the clone into `Arc<dyn MultipartUploadBackend>`.
// It still shares its underlying pointer/refcount with the original, though!
// `?` to return `Err(Error::NotImplemented)` when not implemented.
let inner_multipart = self.inner.clone().as_multipart_upload_backend()?;
self.spawn("initiate_multipart", async move {
inner_multipart.initiate_multipart(&id, &metadata).await
}).await
}
}the Arc refcounting all still tracks the same object. this is just a super verbose cast for trait objects inside Arcs. you can confirm the refcount stuff in this playground link https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=ea6b827dd6fb9c3194bac54d0f62b2ea
| fn validate_part_number(part_number: u32) -> ApiResult<()> { | ||
| if part_number == 0 { | ||
| return Err(ApiError::Client("part_number must be >= 1".into())); | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
would using a type PartNumber = std::num::NonZero<u32> alias instead of u32 for these part_number fields take care of this? we'd have to move the error handling somewhere else but we wouldn't have to remember to call validate()
| id, | ||
| params.upload_id, | ||
| params.max_parts, | ||
| params.part_number_marker, |
There was a problem hiding this comment.
technically you have to validate that this is >=1 as well
| Query(params): Query<UploadIdQuery>, | ||
| Json(body): Json<CompleteRequest>, | ||
| ) -> ApiResult<Response> { | ||
| service.check_permission(Permission::ObjectWrite, id.context())?; |
There was a problem hiding this comment.
is there something we need to check that isn't covered in AuthAwareService's wrapper around complete_multipart()?
Final part of the Multipart uploads implementation server-side, this implements the endpoints on top of the Service.
Close FS-341