View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable;
19  
20  import com.google.common.base.Optional;
21  import com.google.common.util.concurrent.ListenableFuture;
22  import com.google.common.util.concurrent.SettableFuture;
23  import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
24  
25  import java.io.IOException;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  public class InMemoryCommitTable implements CommitTable {
29  
30      final ConcurrentHashMap<Long, Long> table = new ConcurrentHashMap<>();
31  
32      long lowWatermark;
33  
34      @Override
35      public CommitTable.Writer getWriter() {
36          return new Writer();
37      }
38  
39      @Override
40      public CommitTable.Client getClient() {
41          return new Client();
42      }
43  
44      public class Writer implements CommitTable.Writer {
45          @Override
46          public void addCommittedTransaction(long startTimestamp, long commitTimestamp) {
47              // In this implementation, we use only one location that represents
48              // both the value and the invalidation. Therefore, putIfAbsent is
49              // required to make sure the entry was not invalidated.
50              table.putIfAbsent(startTimestamp, commitTimestamp);
51          }
52  
53          @Override
54          public void updateLowWatermark(long lowWatermark) throws IOException {
55              InMemoryCommitTable.this.lowWatermark = lowWatermark;
56          }
57  
58          @Override
59          public void flush() throws IOException {
60              // noop
61          }
62  
63          @Override
64          public void clearWriteBuffer() {
65              table.clear();
66          }
67  
68          @Override
69          public void close() {
70          }
71      }
72  
73      public class Client implements CommitTable.Client {
74          @Override
75          public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
76              SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
77              Long result = table.get(startTimestamp);
78              if (result == null) {
79                  f.set(Optional.<CommitTimestamp>absent());
80              } else {
81                  if (result == INVALID_TRANSACTION_MARKER) {
82                      f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false)));
83                  } else {
84                      f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, result, true)));
85                  }
86              }
87              return f;
88          }
89  
90          @Override
91          public ListenableFuture<Long> readLowWatermark() {
92              SettableFuture<Long> f = SettableFuture.create();
93              f.set(lowWatermark);
94              return f;
95          }
96  
97          @Override
98          public ListenableFuture<Void> completeTransaction(long startTimestamp) {
99              SettableFuture<Void> f = SettableFuture.create();
100             table.remove(startTimestamp);
101             f.set(null);
102             return f;
103         }
104 
105         @Override
106         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
107 
108             SettableFuture<Boolean> f = SettableFuture.create();
109             Long old = table.get(startTimestamp);
110 
111             // If the transaction represented by startTimestamp is not in the map
112             if (old == null) {
113                 // Try to invalidate the transaction
114                 old = table.putIfAbsent(startTimestamp, INVALID_TRANSACTION_MARKER);
115                 // If we were able to invalidate or someone else invalidate before us
116                 if (old == null || old == INVALID_TRANSACTION_MARKER) {
117                     f.set(true);
118                     return f;
119                 }
120             } else {
121                 // Check if the value we read marked the transaction as invalid
122                 if (old == INVALID_TRANSACTION_MARKER) {
123                     f.set(true);
124                     return f;
125                 }
126             }
127 
128             // At this point the transaction was already in the map at the beginning
129             // of the method or was added right before we tried to invalidate.
130             f.set(false);
131             return f;
132         }
133 
134         @Override
135         public void close() {
136         }
137     }
138 
139     public int countElements() {
140         return table.size();
141     }
142 
143 }