Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ setup: build
echo "Current context is not k3d-beta9"; \
exit 1; \
fi
cd hack; kubectl apply -f deployment.yaml; cd ..
helm install blobcache-redis oci://registry-1.docker.io/bitnamicharts/redis --set architecture=standalone --set auth.password=password
cd hack; kubectl apply -f deployment.yaml; kubectl apply -f service.yaml; cd ..
14 changes: 14 additions & 0 deletions hack/svc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: blobcache
namespace: beta9
spec:
selector:
app: blobcache
ports:
- name: grpc
protocol: TCP
port: 2049
targetPort: 2049
type: ClusterIP
24 changes: 9 additions & 15 deletions pkg/blobfs_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,13 @@ func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*

n.log("Storing content from source with path: %s", sourcePath)

cacheSource := struct {
Path string
}{
cacheSource := ContentSourceFUSE{
Path: sourcePath,
}
_, err := n.filesystem.Client.StoreContentFromFUSE(cacheSource, struct {
RoutingKey string
Lock bool
}{
RoutingKey: sourcePath,
Lock: true,
_, err := n.filesystem.Client.StoreContentFromFUSE(cacheSource, StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: sourcePath,
Lock: true,
})
if err != nil {
return nil, syscall.ENOENT
Expand Down Expand Up @@ -197,12 +193,10 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int
}{
Path: sourcePath,
}
_, err = n.filesystem.Client.StoreContentFromFUSE(cacheSource, struct {
RoutingKey string
Lock bool
}{
RoutingKey: sourcePath,
Lock: true,
_, err = n.filesystem.Client.StoreContentFromFUSE(cacheSource, StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: sourcePath,
Lock: true,
})
// If multiple clients try to store the same file, some may get ErrUnableToAcquireLock
// In this case, we should tell the client to retry the Read instead of returning an error
Expand Down
106 changes: 54 additions & 52 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ const (
readAheadKB = 32768
)

type ContentSourceS3 struct {
Path string
BucketName string
Region string
EndpointURL string
AccessKey string
SecretKey string
ForcePathStyle bool
}

type ContentSourceFUSE struct {
Path string
}

type StoreContentOptions struct {
CreateCacheFSEntry bool
RoutingKey string
Lock bool
}

type RendezvousHasher interface {
Add(hosts ...*BlobCacheHost)
Remove(host *BlobCacheHost)
Expand Down Expand Up @@ -223,47 +243,39 @@ func (c *BlobCacheClient) monitorHost(host *BlobCacheHost) {
}
}

func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, path string) bool {
metadata, err := c.coordinator.GetFsNode(ctx, GenerateFsID(path))
func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, routingKey string) bool {
metadata, err := c.coordinator.GetFsNode(ctx, GenerateFsID(routingKey))
if err != nil {
Logger.Errorf("error getting fs node: %v, path: %s", err, path)
Logger.Errorf("error getting fs node: %v, path: %s", err, routingKey)
return false
}

exists, err := c.IsCachedNearby(metadata.Hash, path)
if err != nil {
Logger.Errorf("error checking if content is cached nearby: %v, hash: %s", err, metadata.Hash)
return false
}

return exists
}

func (c *BlobCacheClient) IsCachedNearby(hash string, routingKey string) (bool, error) {
hostsToCheck := c.clientConfig.NTopHosts

for hostIndex := 0; hostIndex < hostsToCheck; hostIndex++ {
client, _, err := c.getGRPCClient(&ClientRequest{
rt: ClientRequestTypeRetrieval,
hash: hash,
hash: metadata.Hash,
key: routingKey,
hostIndex: hostIndex,
})
if err != nil {
return false, err
Logger.Errorf("error checking if content is cached nearby: %v, hash: %s", err, metadata.Hash)
return false
}

resp, err := client.HasContent(c.ctx, &proto.HasContentRequest{Hash: hash})
resp, err := client.HasContent(c.ctx, &proto.HasContentRequest{Hash: metadata.Hash})
if err != nil {
return false, err
Logger.Errorf("error checking if content is cached nearby: %v, hash: %s", err, metadata.Hash)
return false
}

if resp.Exists {
return true, nil
return true
}
}

return false, nil
return false
}

func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64, opts struct {
Expand Down Expand Up @@ -498,12 +510,7 @@ func (c *BlobCacheClient) StoreContent(chunks chan []byte, hash string, opts str
return resp.Hash, nil
}

func (c *BlobCacheClient) StoreContentFromFUSE(source struct {
Path string
}, opts struct {
RoutingKey string
Lock bool
}) (string, error) {
func (c *BlobCacheClient) StoreContentFromFUSE(source ContentSourceFUSE, opts StoreContentOptions) (string, error) {
ctx, cancel := context.WithTimeout(c.ctx, storeContentRequestTimeout)
defer cancel()

Expand Down Expand Up @@ -555,17 +562,7 @@ func (c *BlobCacheClient) StoreContentFromFUSE(source struct {
return resp.Hash, nil
}

func (c *BlobCacheClient) StoreContentFromS3(source struct {
Path string
BucketName string
Region string
EndpointURL string
AccessKey string
SecretKey string
}, opts struct {
RoutingKey string
Lock bool
}) (string, error) {
func (c *BlobCacheClient) StoreContentFromS3(source ContentSourceS3, opts StoreContentOptions) (string, error) {
ctx, cancel := context.WithTimeout(c.ctx, storeContentRequestTimeout)
defer cancel()

Expand All @@ -582,15 +579,27 @@ func (c *BlobCacheClient) StoreContentFromS3(source struct {
return "", err
}

sourceType := proto.CacheSourceType_FUSE
if source.BucketName != "" {
sourceType = proto.CacheSourceType_S3
}

sourceProto := proto.StoreContentFromSourceRequest{
Source: &proto.CacheSource{
Path: source.Path,
BucketName: source.BucketName,
Region: source.Region,
EndpointUrl: source.EndpointURL,
AccessKey: source.AccessKey,
SecretKey: source.SecretKey,
ForcePathStyle: source.ForcePathStyle,
},
SourceType: sourceType,
CreateCacheFsEntry: opts.CreateCacheFSEntry,
}

if opts.Lock {
resp, err := client.StoreContentFromSourceWithLock(ctx, &proto.StoreContentFromSourceRequest{Source: &proto.CacheSource{
Path: source.Path,
BucketName: source.BucketName,
Region: source.Region,
EndpointUrl: source.EndpointURL,
AccessKey: source.AccessKey,
SecretKey: source.SecretKey,
}})
resp, err := client.StoreContentFromSourceWithLock(ctx, &sourceProto)
if err != nil {
return "", err
}
Expand All @@ -606,14 +615,7 @@ func (c *BlobCacheClient) StoreContentFromS3(source struct {
return resp.Hash, nil
}

resp, err := client.StoreContentFromSource(ctx, &proto.StoreContentFromSourceRequest{Source: &proto.CacheSource{
Path: source.Path,
BucketName: source.BucketName,
Region: source.Region,
EndpointUrl: source.EndpointURL,
AccessKey: source.AccessKey,
SecretKey: source.SecretKey,
}})
resp, err := client.StoreContentFromSource(ctx, &sourceProto)
if err != nil {
return "", err
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/s3_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ type S3Client struct {
}

type S3SourceConfig struct {
BucketName string
Region string
EndpointURL string
AccessKey string
SecretKey string
BucketName string
Region string
EndpointURL string
AccessKey string
SecretKey string
ForcePathStyle bool
}

func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig) (*S3Client, error) {
Expand All @@ -38,11 +39,12 @@ func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig) (*S3Client, e
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}

if sourceConfig.EndpointURL != "" {
cfg.BaseEndpoint = aws.String(sourceConfig.EndpointURL)
}

s3Client := s3.NewFromConfig(cfg)
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
if sourceConfig.EndpointURL != "" {
o.BaseEndpoint = aws.String(sourceConfig.EndpointURL)
}
o.UsePathStyle = sourceConfig.ForcePathStyle
})
return &S3Client{
Client: s3Client,
Source: sourceConfig,
Expand Down
Loading