-
Notifications
You must be signed in to change notification settings - Fork 243
Expand file tree
/
Copy pathQueuedPaddleOcrAll.cs
More file actions
147 lines (131 loc) · 4.97 KB
/
QueuedPaddleOcrAll.cs
File metadata and controls
147 lines (131 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
using OpenCvSharp;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
using System.Linq;
namespace Sdcb.PaddleOCR;
/// <summary>
/// A class for queuing multiple OCR requests using PaddleOCR.
/// </summary>
public class QueuedPaddleOcrAll : IDisposable
{
private readonly Func<PaddleOcrAll> _factory;
private readonly BlockingCollection<ThreadedQueueItem> _queue;
private readonly Task[] _workers;
private readonly CountdownEvent _countdownEvent;
private readonly ConcurrentBag<Exception> _constructExceptions = new ConcurrentBag<Exception>();
private bool _disposed;
/// <summary>
/// Constructs an instance of <see cref="QueuedPaddleOcrAll"/> class.
/// </summary>
/// <param name="factory">The function that constructs each individual instance of <see cref="PaddleOcrAll"/>.</param>
/// <param name="consumerCount">The number of consumers that process the OCR requests.</param>
/// <param name="boundedCapacity">The maximum number of queued OCR requests.</param>
public QueuedPaddleOcrAll(Func<PaddleOcrAll> factory, int consumerCount = 1, int boundedCapacity = 64)
{
_factory = factory;
_queue = new BlockingCollection<ThreadedQueueItem>(boundedCapacity);
_workers = new Task[consumerCount];
_countdownEvent = new CountdownEvent(consumerCount);
for (int i = 0; i < consumerCount; i++)
{
_workers[i] = Task.Run(ProcessQueue);
}
try
{
WaitFactoryReady();
}
catch (AggregateException)
{
Dispose();
throw;
}
}
/// <summary>
/// Waits for the factory to become ready before processing OCR requests.
/// </summary>
/// <exception cref="ObjectDisposedException">The instance of <see cref="QueuedPaddleOcrAll"/> is disposed.</exception>
private void WaitFactoryReady()
{
if (_disposed) throw new ObjectDisposedException(nameof(QueuedPaddleOcrAll));
_countdownEvent.Wait();
if (_constructExceptions.Any())
{
throw new AggregateException(_constructExceptions);
}
}
/// <summary>
/// Queues an OCR request to be processed.
/// </summary>
/// <param name="src">The image to be recognized.</param>
/// <param name="recognizeBatchSize">
/// The number of images recognized with one call.
/// Zero means single recognition only. Maximum value is limited by the model and hardware.
/// </param>
/// <param name="configure">
/// Configuration action to customize the <see cref="PaddleOcrAll"/> instance before running the OCR.
/// </param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> that represents the queued OCR operation.</returns>
/// <exception cref="ObjectDisposedException">The instance of <see cref="QueuedPaddleOcrAll"/> is disposed.</exception>
public Task<PaddleOcrResult> Run(Mat src, int recognizeBatchSize = 0, Action<PaddleOcrAll>? configure = null, CancellationToken cancellationToken = default)
{
if (_disposed) throw new ObjectDisposedException(nameof(QueuedPaddleOcrAll));
TaskCompletionSource<PaddleOcrResult> tcs = new();
cancellationToken.ThrowIfCancellationRequested();
_queue.Add(new ThreadedQueueItem(src, recognizeBatchSize, configure, cancellationToken, tcs), cancellationToken);
return tcs.Task;
}
private void ProcessQueue()
{
PaddleOcrAll paddleOcr = null!;
try
{
paddleOcr = _factory();
}
catch (Exception e)
{
_constructExceptions.Add(e);
}
finally
{
_countdownEvent.Signal();
}
using PaddleOcrAll _ = paddleOcr;
foreach (ThreadedQueueItem item in _queue.GetConsumingEnumerable())
{
if (item.CancellationToken.IsCancellationRequested || _disposed)
{
item.TaskCompletionSource.SetCanceled();
continue;
}
try
{
item.Configure?.Invoke(paddleOcr);
PaddleOcrResult result = paddleOcr.Run(item.Source, item.RecognizeBatchSize);
item.TaskCompletionSource.SetResult(result);
}
catch (Exception ex)
{
item.TaskCompletionSource.SetException(ex);
}
}
}
/// <summary>
/// Disposes this instance of <see cref="QueuedPaddleOcrAll"/> and releases associated resources.
/// </summary>
public void Dispose()
{
_disposed = true;
_queue.CompleteAdding();
Task.WaitAll(_workers);
_countdownEvent.Dispose();
}
}
internal record ThreadedQueueItem(
Mat Source,
int RecognizeBatchSize,
Action<PaddleOcrAll>? Configure,
CancellationToken CancellationToken,
TaskCompletionSource<PaddleOcrResult> TaskCompletionSource);