@@ -339,6 +339,7 @@ struct
339339 ( HH.promoteChunks thread
340340 ; HH.setDepth (thread, depth)
341341 ; DE.decheckJoin (tidLeft, tidRight)
342+ ; maybeParClearSuspectsAtDepth (thread, depth)
342343 (* ; dbgmsg' (fn _ => "join fast at depth " ^ Int.toString depth) *)
343344 (* ; HH.forceNewChunk () *)
344345 ; let
@@ -362,6 +363,7 @@ struct
362363 HH.setDepth (thread, depth);
363364 DE.decheckJoin (tidLeft, tidRight);
364365 setQueueDepth (myWorkerId ()) depth;
366+ maybeParClearSuspectsAtDepth (thread, depth);
365367 (* dbgmsg' (fn _ => "join slow at depth " ^ Int.toString depth); *)
366368 case HM.refDerefNoBarrier rightSideResult of
367369 NONE => die (fn _ => " scheduler bug: join failed: missing result" )
@@ -374,8 +376,83 @@ struct
374376 (extractResult fr, extractResult gr)
375377 end
376378
379+
380+ and simpleParFork thread depth (f: unit -> unit, g: unit -> unit) : unit =
381+ let
382+ val rightSideThread = ref (NONE : Thread.t option)
383+ val rightSideResult = ref (NONE : unit result option)
384+ val incounter = ref 2
377385
378- fun forkGC thread depth (f : unit -> 'a, g : unit -> 'b) =
386+ val (tidLeft, tidRight) = DE.decheckFork ()
387+
388+ fun g ' () =
389+ let
390+ val () = DE.copySyncDepthsFromThread (thread, depth+1 )
391+ val () = DE.decheckSetTid tidRight
392+ val gr = result g
393+ val t = Thread.current ()
394+ in
395+ rightSideThread := SOME t;
396+ rightSideResult := SOME gr;
397+ if decrementHitsZero incounter then
398+ ( setQueueDepth (myWorkerId ()) (depth+1 )
399+ ; threadSwitch thread
400+ )
401+ else
402+ returnToSched ()
403+ end
404+ val _ = push (NormalTask g')
405+ val _ = HH.setDepth (thread, depth + 1 )
406+ (* NOTE: off-by-one on purpose. Runtime depths start at 1. *)
407+ val _ = recordForkDepth depth
408+
409+ val _ = DE.decheckSetTid tidLeft
410+ val fr = result f
411+ val tidLeft = DE.decheckGetTid thread
412+
413+ val gr =
414+ if popDiscard () then
415+ ( HH.promoteChunks thread
416+ ; HH.setDepth (thread, depth)
417+ ; DE.decheckJoin (tidLeft, tidRight)
418+ ; maybeParClearSuspectsAtDepth (thread, depth)
419+ (* ; dbgmsg' (fn _ => "join fast at depth " ^ Int.toString depth) *)
420+ (* ; HH.forceNewChunk () *)
421+ ; let
422+ val gr = result g
423+ in
424+ (* (gr, DE.decheckGetTid thread) *)
425+ gr
426+ end
427+ )
428+ else
429+ ( clear () (* this should be safe after popDiscard fails? *)
430+ ; if decrementHitsZero incounter then () else returnToSched ()
431+ ; case HM.refDerefNoBarrier rightSideThread of
432+ NONE => die (fn _ => " scheduler bug: join failed" )
433+ | SOME t =>
434+ let
435+ val tidRight = DE.decheckGetTid t
436+ in
437+ HH.mergeThreads (thread, t);
438+ HH.promoteChunks thread;
439+ HH.setDepth (thread, depth);
440+ DE.decheckJoin (tidLeft, tidRight);
441+ setQueueDepth (myWorkerId ()) depth;
442+ maybeParClearSuspectsAtDepth (thread, depth);
443+ (* dbgmsg' (fn _ => "join slow at depth " ^ Int.toString depth); *)
444+ case HM.refDerefNoBarrier rightSideResult of
445+ NONE => die (fn _ => " scheduler bug: join failed: missing result" )
446+ | SOME gr => gr
447+ end
448+ )
449+ in
450+ (extractResult fr, extractResult gr);
451+ ()
452+ end
453+
454+
455+ and forkGC thread depth (f : unit -> 'a, g : unit -> 'b) =
379456 let
380457 val heapId = ref (HH.getRoot thread)
381458 val gcTaskTuple = (thread, heapId)
@@ -416,6 +493,7 @@ struct
416493
417494 val _ = HH.promoteChunks thread
418495 val _ = HH.setDepth (thread, depth)
496+ val _ = maybeParClearSuspectsAtDepth (thread, depth)
419497 (* val _ = dbgmsg' (fn _ => "join CC at depth " ^ Int.toString depth) *)
420498 in
421499 result
@@ -437,7 +515,55 @@ struct
437515 (f (), g ())
438516 end
439517
440- fun fork (f, g) = fork' {ccOkayAtThisDepth=true } (f, g)
518+ and fork (f, g) = fork' {ccOkayAtThisDepth=true } (f, g)
519+
520+ and simpleFork (f, g) =
521+ let
522+ val thread = Thread.current ()
523+ val depth = HH.getDepth thread
524+ in
525+ (* if ccOkayAtThisDepth andalso depth = 1 then *)
526+ if depth < Queue.capacity andalso depthOkayForDECheck depth then
527+ simpleParFork thread depth (f, g)
528+ else
529+ (* don't let us hit an error, just sequentialize instead *)
530+ (f (); g ())
531+ end
532+
533+ and maybeParClearSuspectsAtDepth (t, d) =
534+ if HH.numSuspectsAtDepth (t, d) <= 10000 then
535+ HH.clearSuspectsAtDepth (t, d)
536+ else
537+ let
538+ val cs = HH.takeClearSetAtDepth (t, d)
539+ val count = HH.numChunksInClearSet cs
540+ val grainSize = 20
541+ val numGrains = 1 + (count-1 ) div grainSize
542+ val results = ArrayExtra.alloc numGrains
543+ fun start i = i*grainSize
544+ fun stop i = Int.min (grainSize + start i, count)
545+
546+ fun processLoop i j =
547+ if j-i = 1 then
548+ Array.update (results, i, HH.processClearSetGrain (cs, start i, stop i))
549+ else
550+ let
551+ val mid = i + (j-i) div 2
552+ in
553+ simpleFork (fn _ => processLoop i mid, fn _ => processLoop mid j)
554+ end
555+
556+ fun commitLoop i =
557+ if i >= numGrains then () else
558+ ( HH.commitFinishedClearSetGrain (t, Array.sub (results, i))
559+ ; commitLoop (i+1 )
560+ )
561+ in
562+ processLoop 0 numGrains;
563+ commitLoop 0 ;
564+ HH.deleteClearSet cs;
565+ maybeParClearSuspectsAtDepth (t, d) (* need to go again, just in case *)
566+ end
441567 end
442568
443569 (* ========================================================================
0 commit comments