YARN-11236. Implement FederationReservationHomeSubClusterStore With MemoryStore.#4711
YARN-11236. Implement FederationReservationHomeSubClusterStore With MemoryStore.#4711goiri merged 9 commits intoapache:trunkfrom
Conversation
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Could you please help to review the code? Thank you very much! |
| // note this might throw YarnException if the reservation is | ||
| // unknown. This is to be expected, and should be handled by | ||
| // policy invoker. | ||
| SubClusterId resSubCluster = getPolicyContext().getFederationStateStoreFacade(). |
There was a problem hiding this comment.
Thank you very much for your help reviewing the code, I will fix it.
| SubClusterId resSubCluster = getPolicyContext().getFederationStateStoreFacade(). | ||
| getReservationHomeSubCluster(reservationId); | ||
|
|
||
| return Collections.singletonMap(resSubCluster, activeSubClusters.get(resSubCluster)); |
| public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( | ||
| AddReservationHomeSubClusterRequest request) throws YarnException { | ||
| FederationReservationHomeSubClusterStoreInputValidator | ||
| .validateAddReservationHomeSubClusterRequest(request); |
There was a problem hiding this comment.
These names are pretty verose.
If we are in FederationReservationHomeSubClusterStoreInputValidator isn't enough to say validateRequest? or validate as the type is in the arg?
There was a problem hiding this comment.
Thanks for your suggestion, I have modified the code, the method is called validate()
| * against | ||
| * @throws FederationStateStoreInvalidInputException if the request is invalid | ||
| */ | ||
| public static void validateGetReservationHomeSubClusterRequest( |
There was a problem hiding this comment.
validate() should be enough as the type is there.
There was a problem hiding this comment.
I have modified the code, the method is called validate()
| private static void checkReservationId(ReservationId reservationId) | ||
| throws FederationStateStoreInvalidInputException { | ||
| if (reservationId == null) { | ||
| String message = "Missing ReservationId." |
| .DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); | ||
| if (heartbeatInterval <= 0) { | ||
| heartbeatInterval = | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS; |
There was a problem hiding this comment.
Can we leave the these changes as they were?
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Please help to review the code again, Thank you very much! |
| if (!reservations.containsKey(reservationId)) { | ||
| throw new YarnException("Reservation " + reservationId + " does not exist"); | ||
| } | ||
| return GetReservationHomeSubClusterResponse.newInstance( |
There was a problem hiding this comment.
extractreservations.get(reservationId))
There was a problem hiding this comment.
Thanks for your help reviewing the code, I will fix it.
| GetReservationsHomeSubClusterRequest request) throws YarnException { | ||
| List<ReservationHomeSubCluster> result = new ArrayList<>(); | ||
| for (Entry<ReservationId, SubClusterId> e : reservations.entrySet()) { | ||
| result.add(ReservationHomeSubCluster.newInstance(e.getKey(), e.getValue())); |
| @Override | ||
| public GetReservationHomeSubClusterResponse getReservationHomeSubCluster( | ||
| GetReservationHomeSubClusterRequest request) throws YarnException { | ||
| return null; |
There was a problem hiding this comment.
Should we throw the not implemented exception?
| public SubClusterId getReservationHomeSubCluster(ReservationId reservationId) | ||
| throws YarnException { | ||
| GetReservationHomeSubClusterResponse response = | ||
| stateStore.getReservationHomeSubCluster( |
| // route an application that uses this app | ||
| ApplicationSubmissionContext applicationSubmissionContext = | ||
| ApplicationSubmissionContext.newInstance( | ||
| ApplicationId.newInstance(System.currentTimeMillis(), 1), "app1", |
There was a problem hiding this comment.
Kind of out of scope but should we use Time.now()?
| "queue1", Priority.newInstance(1), null, false, false, 1, null, | ||
| null, false); | ||
| applicationSubmissionContext.setReservationID(resReq.getReservationId()); | ||
| SubClusterId chosen2 = ((FederationRouterPolicy) getPolicy()) |
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Please help to review the code again, Thank you very much! |
| public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( | ||
| AddReservationHomeSubClusterRequest request) throws YarnException { | ||
| FederationReservationHomeSubClusterStoreInputValidator.validate(request); | ||
| ReservationId reservationId = |
| request.getReservationHomeSubCluster().getReservationId(); | ||
| if (!reservations.containsKey(reservationId)) { | ||
| reservations.put(reservationId, | ||
| request.getReservationHomeSubCluster().getHomeSubCluster()); |
| } | ||
| SubClusterId subClusterId = reservations.get(reservationId); | ||
| return GetReservationHomeSubClusterResponse.newInstance( | ||
| ReservationHomeSubCluster.newInstance(reservationId, subClusterId)); |
| List<ReservationHomeSubCluster> result = new ArrayList<>(); | ||
|
|
||
| for (Entry<ReservationId, SubClusterId> entry : reservations.entrySet()) { | ||
| ReservationId key = entry.getKey(); |
There was a problem hiding this comment.
better names than key/value; reservationId, subclusterId
| @Unstable | ||
| public static ReservationHomeSubCluster newInstance(ReservationId appId, | ||
| SubClusterId homeSubCluster) { | ||
| ReservationHomeSubCluster appMapping = |
| @Override | ||
| public void setReservationId(ReservationId reservationId) { | ||
| maybeInitBuilder(); | ||
| if (reservationId == null) { |
There was a problem hiding this comment.
Don't hide the field: resId as param
There was a problem hiding this comment.
Thanks for your suggestion, I will modify the code.
| return null; | ||
| } | ||
| this.reservationId = convertFromProtoFormat(p.getReservationId()); | ||
| return reservationId; |
| if (homeSubCluster == null) { | ||
| builder.clearHomeSubCluster(); | ||
| } | ||
| this.homeSubCluster = homeSubCluster; |
| long now = Time.now(); | ||
| ReservationSubmissionRequest resReq = getReservationSubmissionRequest(); | ||
| when(resReq.getQueue()).thenReturn("queue1"); | ||
| when(resReq.getReservationId()) |
|
@goiri Please help to review the code again, Thank you very much! |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Can you help to merge this pr into trunk branch? I will follow up with YARN-11252, Thank you very much! |
|
Can we get a clean build? |
@goiri Thank you for your suggestion. I personally feel that this may be unavoidable, this part of the code is automatically generated. We can check the following issue, the current version of protobuf does not seem to be able to solve it. I also checked the full compilation log, and I found that something like this is common. We can check the following link: |
|
💔 -1 overall
This message was automatically generated. |
This reverts commit cbf67f5.
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Thank you very much for your help reviewing the code! |
JIRA:YARN-11236. Implement FederationReservationHomeSubClusterStore With MemoryStore.
This pr mainly implements the methods of AddReservation and GetAddReservation. This version completes the basic logic first, and only implements the part of MemoryStore.