View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.util.concurrent.SettableFuture;
21  import org.apache.omid.committable.CommitTable;
22  
23  import java.io.IOException;
24  import java.util.Set;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  class MockTSOClient implements TSOProtocol {
28      private final AtomicLong timestampGenerator = new AtomicLong();
29      private static final int CONFLICT_MAP_SIZE = 1_000_000;
30      private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
31      private final AtomicLong lwm = new AtomicLong();
32  
33      private final CommitTable.Writer commitTable;
34  
35      MockTSOClient(CommitTable.Writer commitTable) {
36          this.commitTable = commitTable;
37      }
38  
39      @Override
40      public TSOFuture<Long> getNewStartTimestamp() {
41          synchronized (conflictMap) {
42              SettableFuture<Long> f = SettableFuture.create();
43              f.set(timestampGenerator.incrementAndGet());
44              return new ForwardingTSOFuture<>(f);
45          }
46      }
47  
48      @Override
49      public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
50          synchronized (conflictMap) {
51              SettableFuture<Long> f = SettableFuture.create();
52              if (transactionId < lwm.get()) {
53                  f.setException(new AbortException());
54                  return new ForwardingTSOFuture<>(f);
55              }
56  
57              boolean canCommit = true;
58              for (CellId c : cells) {
59                  int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
60                  if (conflictMap[index] >= transactionId) {
61                      canCommit = false;
62                      break;
63                  }
64              }
65  
66              if (canCommit) {
67                  long commitTimestamp = timestampGenerator.incrementAndGet();
68                  for (CellId c : cells) {
69                      int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
70                      long oldVal = conflictMap[index];
71                      conflictMap[index] = commitTimestamp;
72                      long curLwm = lwm.get();
73                      while (oldVal > curLwm) {
74                          if (lwm.compareAndSet(curLwm, oldVal)) {
75                              break;
76                          }
77                          curLwm = lwm.get();
78                      }
79                  }
80  
81                  f.set(commitTimestamp);
82                  try {
83                      commitTable.addCommittedTransaction(transactionId, commitTimestamp);
84                      commitTable.updateLowWatermark(lwm.get());
85                      commitTable.flush();
86                  } catch (IOException ioe) {
87                      f.setException(ioe);
88                  }
89              } else {
90                  f.setException(new AbortException());
91              }
92              return new ForwardingTSOFuture<>(f);
93          }
94      }
95  
96      @Override
97      public TSOFuture<Void> close() {
98          SettableFuture<Void> f = SettableFuture.create();
99          f.set(null);
100         return new ForwardingTSOFuture<>(f);
101     }
102 }