1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
21 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
23 import org.apache.commons.pool2.BasePooledObjectFactory;
24 import org.apache.commons.pool2.PooledObject;
25 import org.apache.commons.pool2.impl.DefaultPooledObject;
26 import org.jboss.netty.channel.Channel;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import java.util.Arrays;
31
32 public class Batch {
33
34 private static final Logger LOG = LoggerFactory.getLogger(Batch.class);
35
36 private final int id;
37 private final int size;
38 private int numEvents;
39 private final PersistEvent[] events;
40
41 Batch(int id, int size) {
42
43 Preconditions.checkArgument(size > 0, "Size [%s] must be positive", size);
44 this.size = size;
45 this.id = id;
46 this.numEvents = 0;
47 this.events = new PersistEvent[size];
48 for (int i = 0; i < size; i++) {
49 this.events[i] = new PersistEvent();
50 }
51 LOG.info("Batch id {} created with size {}", id, size);
52
53 }
54
55 PersistEvent get(int idx) {
56 Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents,
57 "Accessing Events array (Size = %s) with wrong index [%s]", numEvents, idx);
58 return events[idx];
59 }
60
61 void set(int idx, PersistEvent event) {
62 Preconditions.checkState(0 <= idx && idx < numEvents);
63 events[idx] = event;
64 }
65
66 void clear() {
67
68 numEvents = 0;
69
70 }
71
72 void decreaseNumEvents() {
73 numEvents--;
74 }
75
76 int getNumEvents() {
77 return numEvents;
78 }
79
80 int getLastEventIdx() {
81 return numEvents - 1;
82 }
83
84 boolean isFull() {
85
86 Preconditions.checkState(numEvents <= size, "Batch Full: numEvents [%s] > size [%s]", numEvents, size);
87 return numEvents == size;
88
89 }
90
91 boolean isEmpty() {
92
93 return numEvents == 0;
94
95 }
96
97 void addTimestamp(long startTimestamp, Channel c, MonitoringContext context) {
98
99 Preconditions.checkState(!isFull(), "batch is full");
100 int index = numEvents++;
101 PersistEvent e = events[index];
102 context.timerStart("persistence.processor.timestamp.latency");
103 e.makePersistTimestamp(startTimestamp, c, context);
104
105 }
106
107 void addFence(long tableID, long fenceTimestamp, Channel c, MonitoringContext context) {
108
109 Preconditions.checkState(!isFull(), "batch is full");
110 int index = numEvents++;
111 PersistEvent e = events[index];
112 context.timerStart("persistence.processor.fence.latency");
113 e.makePersistFence(tableID, fenceTimestamp, c, context);
114
115 }
116
117 void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context, Optional<Long> newLowWatermark) {
118
119 Preconditions.checkState(!isFull(), "batch is full");
120 int index = numEvents++;
121 PersistEvent e = events[index];
122 context.timerStart("persistence.processor.commit.latency");
123 e.makePersistCommit(startTimestamp, commitTimestamp, newLowWatermark, c, context);
124
125 }
126
127 void addCommitRetry(long startTimestamp, Channel c, MonitoringContext context) {
128
129 Preconditions.checkState(!isFull(), "batch is full");
130 int index = numEvents++;
131 PersistEvent e = events[index];
132 context.timerStart("persistence.processor.commit-retry.latency");
133 e.makeCommitRetry(startTimestamp, c, context);
134
135 }
136
137 void addAbort(long startTimestamp, Channel c, MonitoringContext context) {
138
139 Preconditions.checkState(!isFull(), "batch is full");
140 int index = numEvents++;
141 PersistEvent e = events[index];
142 context.timerStart("persistence.processor.abort.latency");
143 e.makePersistAbort(startTimestamp, c, context);
144
145 }
146
147 @Override
148 public String toString() {
149 return MoreObjects.toStringHelper(this)
150 .add("id", id)
151 .add("size", size)
152 .add("num events", numEvents)
153 .add("events", Arrays.toString(events))
154 .toString();
155 }
156
157 static class BatchFactory extends BasePooledObjectFactory<Batch> {
158
159 private static int batchId = 0;
160
161 private int batchSize;
162
163 BatchFactory(int batchSize) {
164 this.batchSize = batchSize;
165 }
166
167 @Override
168 public Batch create() throws Exception {
169 return new Batch(batchId++, batchSize);
170 }
171
172 @Override
173 public PooledObject<Batch> wrap(Batch batch) {
174 return new DefaultPooledObject<>(batch);
175 }
176
177 @Override
178 public void passivateObject(PooledObject<Batch> pooledObject) {
179 pooledObject.getObject().clear();
180 }
181
182 }
183
184 }