1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import com.google.inject.Guice;
21 import com.google.inject.Injector;
22 import com.google.inject.Key;
23 import com.google.inject.TypeLiteral;
24 import org.apache.commons.pool2.ObjectPool;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.testng.annotations.BeforeMethod;
28 import org.testng.annotations.Test;
29
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36
37 import static org.testng.Assert.assertEquals;
38 import static org.testng.Assert.fail;
39
40 public class TestBatchPool {
41
42 private static final Logger LOG = LoggerFactory.getLogger(TestBatchPool.class);
43
44 private static final int CONCURRENT_WRITERS = 16;
45 private static final int BATCH_SIZE = 1000;
46
47
48 private Injector injector;
49
50 @BeforeMethod
51 void setup() {
52
53 TSOServerConfig tsoServerConfig = new TSOServerConfig();
54 tsoServerConfig.setNumConcurrentCTWriters(CONCURRENT_WRITERS);
55 tsoServerConfig.setBatchSizePerCTWriter(BATCH_SIZE);
56
57
58 injector = Guice.createInjector(new BatchPoolModule(tsoServerConfig));
59
60 }
61
62 @Test(timeOut = 10_000)
63 public void testBatchPoolObtainedIsSingleton() {
64
65 final ObjectPool<Batch> instance1 = injector.getInstance(Key.get(new TypeLiteral<ObjectPool<Batch>>() {}));
66 final ObjectPool<Batch> instance2 = injector.getInstance(Key.get(new TypeLiteral<ObjectPool<Batch>>() {}));
67
68 assertEquals(instance1, instance2, "Objects are NOT equal !");
69
70 }
71
72 @Test(timeOut = 10_000)
73 public void testBatchPoolInitializesAllBatchObjectsAsIdle() throws Exception {
74
75 final ObjectPool<Batch> batchPool = injector.getInstance(Key.get(new TypeLiteral<ObjectPool<Batch>>() {}));
76
77 assertEquals(batchPool.getNumActive(), 0);
78 assertEquals(batchPool.getNumIdle(), CONCURRENT_WRITERS);
79
80
81 for (int i = 0; i < CONCURRENT_WRITERS; i++) {
82 batchPool.borrowObject();
83 }
84
85 assertEquals(batchPool.getNumActive(), CONCURRENT_WRITERS);
86 assertEquals(batchPool.getNumIdle(), 0);
87
88 }
89
90 @Test(timeOut = 10_000)
91 public void testBatchPoolBlocksWhenAllObjectsAreActive() throws Exception {
92
93 ExecutorService executor = Executors.newCachedThreadPool();
94
95 final ObjectPool<Batch> batchPool = injector.getInstance(Key.get(new TypeLiteral<ObjectPool<Batch>>() {}));
96
97
98 for (int i = 0; i < CONCURRENT_WRITERS + 1; i++) {
99
100
101 Callable<Batch> task = new Callable<Batch>() {
102 public Batch call() throws Exception {
103 return batchPool.borrowObject();
104 }
105 };
106
107 Future<Batch> future = executor.submit(task);
108
109 try {
110
111
112 Batch batch = future.get(1, TimeUnit.SECONDS);
113 LOG.info("Batch {} returned with success", batch.toString());
114 } catch (TimeoutException ex) {
115 if (i < CONCURRENT_WRITERS) {
116 fail();
117 } else {
118 LOG.info("Yaaaayyyyy! This is the blocked call!");
119 }
120 } finally {
121 future.cancel(true);
122 }
123
124 }
125
126 }
127
128 }