Adds out-of-the-box support for SQS message broker#197
Adds out-of-the-box support for SQS message broker#197thesujai wants to merge 3 commits intonilenso:mainfrom
Conversation
|
@olttwa Just a small review here for this incomplete work before I move forward |
olttwa
left a comment
There was a problem hiding this comment.
Thanks for showing the interest to contribute to Goose @thesujai. I have left some comments.
Adding a message broker involves a lot of complex work, which could be difficult to add without adequate context of Goose. If you are keen on contributing to Goose, I will suggest starting at some beginner friendly issues like #179, #144, #75, #46, #37 or #10
| (close [_] | ||
| (println "SQS client does not require explicit closure."))) | ||
|
|
||
| (def default-opts |
There was a problem hiding this comment.
The default-opts could point to an instance of SQS hosted locally for development purposes, for instance localstack, vsouza or roribio16's SQS docker images.
There was a problem hiding this comment.
Thank you for letting me know that this existed! 😄
|
|
||
| Close | ||
| (close [_] | ||
| (println "SQS client does not require explicit closure."))) |
There was a problem hiding this comment.
Close is not required to be implemented as part of Broker protocol if not needed.
There was a problem hiding this comment.
Okay!
Just a follow-up question - RMQ doesn't implements some methods from broker protocol. For instance the cron-job function. But still we can call it through perform-every of client. Though it ends up throwing a run-time error, Is there a way to let the developer know that they cannot do this(Just so their IDE/LSP can say them not to)(I know it is already mentioned in docs)
| (defn enqueue | ||
| "Enqueues a job for immediate execution." | ||
| [client queue-url job] | ||
| (aws/invoke client {:op :SendMessage |
There was a problem hiding this comment.
Is there an underlying connection pool that is utilized by aws/invoke? Could we have access to this pool and pass in explicitly as a dependency?
There was a problem hiding this comment.
No as of now it does not expose a direct connection pool. We can customize it though https://github.com/cognitect-labs/aws-api?tab=readme-ov-file#overriding-the-http-client
There was a problem hiding this comment.
The cognitect API uses a shared HTTP client internally to cache connections and pool outgoing requests.
| (let [{:keys [client]} this | ||
| {:keys [queue-url]} opts] | ||
|
|
||
| (sqs-request/enqueue client queue-url (u/encode-to-str job)))) |
There was a problem hiding this comment.
Why encode-to-str instead of encode?
There was a problem hiding this comment.
First the reason to encode was that the json/write-str used in sqs_requests.clj ignored the namespaces
So a function like goose.core/process-job would turn to just process-job(It took hours to figure this out when I was implementing it)
So encode-to-str will convert this to a simple string, that can be serialized and deserialized into json which is more readable imo
|
|
||
| (sqs-request/enqueue client queue-url (u/encode-to-str job)))) | ||
|
|
||
| (start-worker |
There was a problem hiding this comment.
For background processing, clients might want to write to different queues, and a worker could process jobs off different queues. Need to design the enqueue & worker API keeping this in mind.
There was a problem hiding this comment.
different queues as in you mean different SQS services(in same account) and different queues within each service. Or you just mean we will allow Only one AWS account with a SQS service where they can connect to multiple queues?
Or you mean support for different accounts of AWS also?
| `opts` : Additional options required for job execution." | ||
| [client queue-url opts] | ||
| (let [continue? (atom true)] | ||
| (async/go-loop [] |
There was a problem hiding this comment.
Goose uses claypoole instead of async for concurrent execution.
| (catch Exception e | ||
| (println "Error executing job:" e)))))) | ||
| (recur)))) | ||
| (fn stop-polling [] |
There was a problem hiding this comment.
Worker needs to return an implementation of goose.worker/stop for client to stop it.
| :request {:QueueUrl queue-url | ||
| :MaxNumberOfMessages 1 | ||
| :WaitTimeSeconds 10}})] | ||
| (when-let [messages (:Messages response)] |
There was a problem hiding this comment.
Dequeuing a job, holding a lock over the job to avoid double executions, executing a job, enqueuing for retry/death upon failure, and deletion from queue post processing are very complex operations. This section needs a lot of re-work keeping all these things in mind.
Checkout redis.worker/start and rmq.worker/start for reference.
|
Hi @thesujai, are you planning to address the review feedback? |
|
Hi @ashutoshgngwr, I was planning to come back to this, sorry missed it. I will be addressing the feedback within few days. |
|
Hey @thesujai, I'm assigning this PR to myself and will continue working on this. Thank you for your contributions! |
|
@ashutoshgngwr do you need assistance with this? |
|
@valerauko Yes, please! I haven't had the time to make any progress on this. |
|
@ashutoshgngwr I'll catch up to the changes proposed and the reviews above, then start working on this. Give me a few days until I can start pushing commits |
Fixes #106
This is very incomplete now. All we can do now is simple enqueue using perform-async and execute the job.
Remaining things are: