View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.committable;
19  
20  import com.google.common.base.Optional;
21  import com.google.common.util.concurrent.ListenableFuture;
22  import com.google.common.util.concurrent.SettableFuture;
23  import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
24  
25  import java.io.IOException;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  public class InMemoryCommitTable implements CommitTable {
29  
30      final ConcurrentHashMap<Long, Long> table = new ConcurrentHashMap<>();
31  
32      long lowWatermark;
33  
34      @Override
35      public CommitTable.Writer getWriter() {
36          return new Writer();
37      }
38  
39      @Override
40      public CommitTable.Client getClient() {
41          return new Client();
42      }
43  
44      public class Writer implements CommitTable.Writer {
45          @Override
46          public void addCommittedTransaction(long startTimestamp, long commitTimestamp) {
47              // In this implementation, we use only one location that represents
48              // both the value and the invalidation. Therefore, putIfAbsent is
49              // required to make sure the entry was not invalidated.
50              table.putIfAbsent(startTimestamp, commitTimestamp);
51          }
52  
53          @Override
54          public void updateLowWatermark(long lowWatermark) throws IOException {
55              InMemoryCommitTable.this.lowWatermark = lowWatermark;
56          }
57  
58          @Override
59          public void flush() throws IOException {
60              // noop
61          }
62  
63          @Override
64          public void clearWriteBuffer() {
65              table.clear();
66          }
67  
68          @Override
69          public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
70              // In this implementation, we use only one location that represents
71              // both the value and the invalidation. Therefore, putIfAbsent is
72              // required to make sure the entry was not invalidated.
73              return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
74          }
75  
76          @Override
77          public void close() {
78          }
79      }
80  
81      public class Client implements CommitTable.Client {
82          @Override
83          public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
84              SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
85              Long result = table.get(startTimestamp);
86              if (result == null) {
87                  f.set(Optional.<CommitTimestamp>absent());
88              } else {
89                  if (result == INVALID_TRANSACTION_MARKER) {
90                      f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false)));
91                  } else {
92                      f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, result, true)));
93                  }
94              }
95              return f;
96          }
97  
98          @Override
99          public ListenableFuture<Long> readLowWatermark() {
100             SettableFuture<Long> f = SettableFuture.create();
101             f.set(lowWatermark);
102             return f;
103         }
104 
105         @Override
106         public ListenableFuture<Void> completeTransaction(long startTimestamp) {
107             SettableFuture<Void> f = SettableFuture.create();
108             table.remove(startTimestamp);
109             f.set(null);
110             return f;
111         }
112 
113         @Override
114         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
115 
116             SettableFuture<Boolean> f = SettableFuture.create();
117             Long old = table.get(startTimestamp);
118 
119             // If the transaction represented by startTimestamp is not in the map
120             if (old == null) {
121                 // Try to invalidate the transaction
122                 old = table.putIfAbsent(startTimestamp, INVALID_TRANSACTION_MARKER);
123                 // If we were able to invalidate or someone else invalidate before us
124                 if (old == null || old == INVALID_TRANSACTION_MARKER) {
125                     f.set(true);
126                     return f;
127                 }
128             } else {
129                 // Check if the value we read marked the transaction as invalid
130                 if (old == INVALID_TRANSACTION_MARKER) {
131                     f.set(true);
132                     return f;
133                 }
134             }
135 
136             // At this point the transaction was already in the map at the beginning
137             // of the method or was added right before we tried to invalidate.
138             f.set(false);
139             return f;
140         }
141 
142         @Override
143         public void close() {
144         }
145     }
146 
147     public int countElements() {
148         return table.size();
149     }
150 
151 }