View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  
23  import org.apache.omid.metrics.Gauge;
24  import org.apache.omid.metrics.MetricsRegistry;
25  import org.apache.omid.timestamp.storage.TimestampStorage;
26  import org.apache.omid.transaction.AbstractTransactionManager;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  import javax.inject.Inject;
31  import javax.inject.Singleton;
32  
33  import java.io.IOException;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Executors;
36  
37  import static org.apache.omid.metrics.MetricsUtils.name;
38  
39  /**
40   * The Timestamp Oracle that gives monotonically increasing timestamps.
41   */
42  @Singleton
43  public class TimestampOracleImpl implements TimestampOracle {
44  
45      private static final Logger LOG = LoggerFactory.getLogger(TimestampOracleImpl.class);
46  
47      @VisibleForTesting
48      static class InMemoryTimestampStorage implements TimestampStorage {
49  
50          long maxTimestamp = 0;
51  
52          @Override
53          public void updateMaxTimestamp(long previousMaxTimestamp, long nextMaxTimestamp) {
54              maxTimestamp = nextMaxTimestamp;
55              LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTimestamp, nextMaxTimestamp);
56          }
57  
58          @Override
59          public long getMaxTimestamp() {
60              return maxTimestamp;
61          }
62  
63      }
64  
65      private class AllocateTimestampBatchTask implements Runnable {
66          long previousMaxTimestamp;
67  
68          AllocateTimestampBatchTask(long previousMaxTimestamp) {
69              this.previousMaxTimestamp = previousMaxTimestamp;
70          }
71  
72          @Override
73          public void run() {
74              long newMaxTimestamp = previousMaxTimestamp + TIMESTAMP_BATCH;
75              try {
76                  storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
77                  maxAllocatedTimestamp = newMaxTimestamp;
78                  previousMaxTimestamp = newMaxTimestamp;
79              } catch (Throwable e) {
80                  panicker.panic("Can't store the new max timestamp", e);
81              }
82          }
83  
84      }
85  
86      static final long TIMESTAMP_BATCH = 10_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 10 million
87      private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 1 million
88  
89      private long lastTimestamp;
90  
91      private long maxTimestamp;
92  
93      private TimestampStorage storage;
94      private Panicker panicker;
95  
96      private long nextAllocationThreshold;
97      private volatile long maxAllocatedTimestamp;
98  
99      private Executor executor = Executors.newSingleThreadExecutor(
100             new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
101 
102     private Runnable allocateTimestampsBatchTask;
103 
104     @Inject
105     public TimestampOracleImpl(MetricsRegistry metrics,
106                                TimestampStorage tsStorage,
107                                Panicker panicker) throws IOException {
108 
109         this.storage = tsStorage;
110         this.panicker = panicker;
111 
112         metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
113             @Override
114             public Long getValue() {
115                 return maxTimestamp;
116             }
117         });
118 
119     }
120 
121     @Override
122     public void initialize() throws IOException {
123 
124         this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
125 
126         this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
127 
128         // Trigger first allocation of timestamps
129         executor.execute(allocateTimestampsBatchTask);
130 
131         LOG.info("Initializing timestamp oracle with timestamp {}", this.lastTimestamp);
132     }
133 
134     /**
135      * Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
136      */
137     @SuppressWarnings("StatementWithEmptyBody")
138     @Override
139     public long next() {
140         lastTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
141 
142         if (lastTimestamp >= nextAllocationThreshold) {
143             // set the nextAllocationThread to max value of long in order to
144             // make sure only one call to this function will execute a thread to extend the timestamp batch.
145             nextAllocationThreshold = Long.MAX_VALUE; 
146             executor.execute(allocateTimestampsBatchTask);
147         }
148 
149         if (lastTimestamp >= maxTimestamp) {
150             assert (maxTimestamp <= maxAllocatedTimestamp);
151             while (maxAllocatedTimestamp == maxTimestamp) {
152                 // spin
153             }
154             assert (maxAllocatedTimestamp > maxTimestamp);
155             maxTimestamp = maxAllocatedTimestamp;
156             nextAllocationThreshold = maxTimestamp - TIMESTAMP_REMAINING_THRESHOLD;
157             assert (nextAllocationThreshold > lastTimestamp && nextAllocationThreshold < maxTimestamp);
158             assert (lastTimestamp < maxTimestamp);
159         }
160 
161         return lastTimestamp;
162     }
163 
164     @Override
165     public long getLast() {
166         return lastTimestamp;
167     }
168 
169     @Override
170     public String toString() {
171         return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
172     }
173 
174 }