Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions packages/extension-redis/src/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class Redis implements Extension {
this.sub.on("messageBuffer", this.handleIncomingMessage);

this.redlock = new Redlock([this.pub], {
driftFactor: 0.1
retryCount: 0,
});

const identifierBuffer = Buffer.from(
Expand Down Expand Up @@ -241,12 +241,23 @@ export class Redis implements Extension {
// to avoid conflict with other instances storing the same document.
const resource = this.lockKey(documentName)
const ttl = this.configuration.lockTimeout
const lock = await this.redlock.acquire([resource], ttl)
const oldLock = this.locks.get(resource)
if (oldLock) {
await oldLock.release
}
this.locks.set(resource, {lock})
try {
await this.redlock.acquire([resource], ttl)
const oldLock = this.locks.get(resource)
if (oldLock) {
await oldLock.release;
}
} catch (error) {
//based on: https://github.com/sesamecare/redlock/blob/508e00dcd1e4d2bc6373ce455f4fe847e98a9aab/src/index.ts#L347-L349
if(error == 'ExecutionError: The operation was unable to achieve a quorum during its retry window.') {
// Expected behavior: Could not acquire lock, another instance locked it already.
// No further `onStoreDocument` hooks will be executed; should throw a silent error with no message.
throw new Error('', { cause: 'Could not acquire lock, another instance locked it already.' });
}
//unexpected error
console.error("unexpected error:", error);
throw error
}
}

/**
Expand All @@ -255,17 +266,16 @@ export class Redis implements Extension {
async afterStoreDocument({documentName, socketId}: afterStoreDocumentPayload) {
const lockKey = this.lockKey(documentName)
const lock = this.locks.get(lockKey)
if (!lock) {
throw new Error(`Lock created in onStoreDocument not found in afterStoreDocument: ${lockKey}`)
}
try {
// Always try to unlock and clean up the lock
lock.release = lock.lock.release()
await lock.release
} catch {
// Lock will expire on its own after timeout
} finally {
this.locks.delete(lockKey)
if (lock) {
try {
// Always try to unlock and clean up the lock
lock.release = lock.lock.release()
await lock.release
} catch {
// Lock will expire on its own after timeout
} finally {
this.locks.delete(lockKey)
}
}
// if the change was initiated by a directConnection, we need to delay this hook to make sure sync can finish first.
// for provider connections, this usually happens in the onDisconnect hook
Expand Down
Loading