View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import com.google.common.util.concurrent.SettableFuture;
21  import org.apache.omid.committable.CommitTable;
22  
23  import java.io.IOException;
24  import java.util.Set;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  // TODO Would be nice to compile all util classes for testing to a separate package that clients could import for tests
28  public class MockTSOClient implements TSOProtocol {
29  
30      private static final int CONFLICT_MAP_SIZE = 1_000_000;
31  
32      private final AtomicLong timestampGenerator = new AtomicLong();
33      private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
34      private final AtomicLong lwm = new AtomicLong();
35  
36      private final CommitTable.Writer commitTable;
37  
38      public MockTSOClient(CommitTable.Writer commitTable) {
39          this.commitTable = commitTable;
40      }
41  
42      @Override
43      public TSOFuture<Long> getNewStartTimestamp() {
44          synchronized (conflictMap) {
45              SettableFuture<Long> f = SettableFuture.create();
46              f.set(timestampGenerator.incrementAndGet());
47              return new ForwardingTSOFuture<>(f);
48          }
49      }
50  
51      @Override
52      public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
53          synchronized (conflictMap) {
54              SettableFuture<Long> f = SettableFuture.create();
55              if (transactionId < lwm.get()) {
56                  f.setException(new AbortException());
57                  return new ForwardingTSOFuture<>(f);
58              }
59  
60              boolean canCommit = true;
61              for (CellId c : cells) {
62                  int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
63                  if (conflictMap[index] >= transactionId) {
64                      canCommit = false;
65                      break;
66                  }
67              }
68  
69              if (canCommit) {
70                  long commitTimestamp = timestampGenerator.incrementAndGet();
71                  for (CellId c : cells) {
72                      int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
73                      long oldVal = conflictMap[index];
74                      conflictMap[index] = commitTimestamp;
75                      long curLwm = lwm.get();
76                      while (oldVal > curLwm) {
77                          if (lwm.compareAndSet(curLwm, oldVal)) {
78                              break;
79                          }
80                          curLwm = lwm.get();
81                      }
82                  }
83  
84                  f.set(commitTimestamp);
85                  try {
86                      commitTable.addCommittedTransaction(transactionId, commitTimestamp);
87                      commitTable.updateLowWatermark(lwm.get());
88                      commitTable.flush();
89                  } catch (IOException ioe) {
90                      f.setException(ioe);
91                  }
92              } else {
93                  f.setException(new AbortException());
94              }
95              return new ForwardingTSOFuture<>(f);
96          }
97      }
98  
99      @Override
100     public TSOFuture<Void> close() {
101         SettableFuture<Void> f = SettableFuture.create();
102         f.set(null);
103         return new ForwardingTSOFuture<>(f);
104     }
105 
106     @Override
107     public long getEpoch() {
108         return 0;
109     }
110 
111 }