View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
22  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
23  import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
24  
25  import java.io.IOException;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  public class InMemoryCommitTable implements CommitTable {
29  
30      final ConcurrentHashMap<Long, Long> table = new ConcurrentHashMap<>();
31  
32      long lowWatermark;
33  
34      @Override
35      public CommitTable.Writer getWriter() {
36          return new Writer();
37      }
38  
39      @Override
40      public CommitTable.Client getClient() {
41          return new Client();
42      }
43  
44      public class Writer implements CommitTable.Writer {
45          @Override
46          public void addCommittedTransaction(long startTimestamp, long commitTimestamp) {
47              // In this implementation, we use only one location that represents
48              // both the value and the invalidation. Therefore, putIfAbsent is
49              // required to make sure the entry was not invalidated.
50              table.putIfAbsent(startTimestamp, commitTimestamp);
51          }
52  
53          @Override
54          public void updateLowWatermark(long lowWatermark) throws IOException {
55              InMemoryCommitTable.this.lowWatermark = lowWatermark;
56          }
57  
58          @Override
59          public void flush() throws IOException {
60              // noop
61          }
62  
63          @Override
64          public void clearWriteBuffer() {
65              table.clear();
66          }
67  
68          @Override
69          public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
70              startTimestamp = removeCheckpointBits(startTimestamp);
71              // In this implementation, we use only one location that represents
72              // both the value and the invalidation. Therefore, putIfAbsent is
73              // required to make sure the entry was not invalidated.
74              return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
75          }
76      }
77  
78      public class Client implements CommitTable.Client {
79          @Override
80          public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
81              startTimestamp = removeCheckpointBits(startTimestamp);
82              SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
83              Long result = table.get(startTimestamp);
84              if (result == null) {
85                  f.set(Optional.<CommitTimestamp>absent());
86              } else {
87                  if (result == INVALID_TRANSACTION_MARKER) {
88                      f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false)));
89                  } else {
90                      f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, result, true)));
91                  }
92              }
93              return f;
94          }
95  
96          @Override
97          public ListenableFuture<Long> readLowWatermark() {
98              SettableFuture<Long> f = SettableFuture.create();
99              f.set(lowWatermark);
100             return f;
101         }
102 
103         @Override
104         public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
105             startTimestamp = removeCheckpointBits(startTimestamp);
106             SettableFuture<Void> f = SettableFuture.create();
107             table.remove(startTimestamp);
108             f.set(null);
109             return f;
110         }
111 
112         @Override
113         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
114             startTimestamp = removeCheckpointBits(startTimestamp);
115             SettableFuture<Boolean> f = SettableFuture.create();
116             Long old = table.get(startTimestamp);
117 
118             // If the transaction represented by startTimestamp is not in the map
119             if (old == null) {
120                 // Try to invalidate the transaction
121                 old = table.putIfAbsent(startTimestamp, INVALID_TRANSACTION_MARKER);
122                 // If we were able to invalidate or someone else invalidate before us
123                 if (old == null || old == INVALID_TRANSACTION_MARKER) {
124                     f.set(true);
125                     return f;
126                 }
127             } else {
128                 // Check if the value we read marked the transaction as invalid
129                 if (old == INVALID_TRANSACTION_MARKER) {
130                     f.set(true);
131                     return f;
132                 }
133             }
134 
135             // At this point the transaction was already in the map at the beginning
136             // of the method or was added right before we tried to invalidate.
137             f.set(false);
138             return f;
139         }
140     }
141 
142     public int countElements() {
143         return table.size();
144     }
145 
146     static long removeCheckpointBits(long startTimestamp) {
147         return startTimestamp - (startTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN);
148     }
149 }