View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import org.apache.omid.metrics.Gauge;
23  import org.apache.omid.metrics.MetricsRegistry;
24  import org.apache.omid.timestamp.storage.TimestampStorage;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import javax.inject.Inject;
29  import javax.inject.Singleton;
30  import java.io.IOException;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.Executors;
33  
34  import static org.apache.omid.metrics.MetricsUtils.name;
35  
36  /**
37   * The Timestamp Oracle that gives monotonically increasing timestamps
38   */
39  @Singleton
40  public class TimestampOracleImpl implements TimestampOracle {
41  
42      private static final Logger LOG = LoggerFactory.getLogger(TimestampOracleImpl.class);
43  
44      @VisibleForTesting
45      static class InMemoryTimestampStorage implements TimestampStorage {
46  
47          long maxTimestamp = 0;
48  
49          @Override
50          public void updateMaxTimestamp(long previousMaxTimestamp, long nextMaxTimestamp) {
51              maxTimestamp = nextMaxTimestamp;
52              LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTimestamp, nextMaxTimestamp);
53          }
54  
55          @Override
56          public long getMaxTimestamp() {
57              return maxTimestamp;
58          }
59  
60      }
61  
62      private class AllocateTimestampBatchTask implements Runnable {
63          long previousMaxTimestamp;
64  
65          AllocateTimestampBatchTask(long previousMaxTimestamp) {
66              this.previousMaxTimestamp = previousMaxTimestamp;
67          }
68  
69          @Override
70          public void run() {
71              long newMaxTimestamp = previousMaxTimestamp + TIMESTAMP_BATCH;
72              try {
73                  storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
74                  maxAllocatedTimestamp = newMaxTimestamp;
75                  previousMaxTimestamp = newMaxTimestamp;
76              } catch (Throwable e) {
77                  panicker.panic("Can't store the new max timestamp", e);
78              }
79          }
80  
81      }
82  
83      static final long TIMESTAMP_BATCH = 10_000_000; // 10 million
84      private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000; // 1 million
85  
86      private long lastTimestamp;
87  
88      private long maxTimestamp;
89  
90      private TimestampStorage storage;
91      private Panicker panicker;
92  
93      private long nextAllocationThreshold;
94      private volatile long maxAllocatedTimestamp;
95  
96      private Executor executor = Executors.newSingleThreadExecutor(
97              new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
98  
99      private Runnable allocateTimestampsBatchTask;
100 
101     @Inject
102     public TimestampOracleImpl(MetricsRegistry metrics,
103                                TimestampStorage tsStorage,
104                                Panicker panicker) throws IOException {
105 
106         this.storage = tsStorage;
107         this.panicker = panicker;
108 
109         metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
110             @Override
111             public Long getValue() {
112                 return maxTimestamp;
113             }
114         });
115 
116     }
117 
118     @Override
119     public void initialize() throws IOException {
120 
121         this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
122 
123         this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
124 
125         // Trigger first allocation of timestamps
126         executor.execute(allocateTimestampsBatchTask);
127 
128         LOG.info("Initializing timestamp oracle with timestamp {}", this.lastTimestamp);
129     }
130 
131     /**
132      * Returns the next timestamp if available. Otherwise spins till the
133      * ts-persist thread performs the new timestamp allocation
134      */
135     @SuppressWarnings("StatementWithEmptyBody")
136     @Override
137     public long next() throws IOException {
138         lastTimestamp++;
139 
140         if (lastTimestamp == nextAllocationThreshold) {
141             executor.execute(allocateTimestampsBatchTask);
142         }
143 
144         if (lastTimestamp >= maxTimestamp) {
145             assert (maxTimestamp <= maxAllocatedTimestamp);
146             while (maxAllocatedTimestamp == maxTimestamp) {
147                 // spin
148             }
149             assert (maxAllocatedTimestamp > maxTimestamp);
150             maxTimestamp = maxAllocatedTimestamp;
151             nextAllocationThreshold = maxTimestamp - TIMESTAMP_REMAINING_THRESHOLD;
152             assert (nextAllocationThreshold > lastTimestamp && nextAllocationThreshold < maxTimestamp);
153             assert (lastTimestamp < maxTimestamp);
154         }
155 
156         return lastTimestamp;
157     }
158 
159     @Override
160     public long getLast() {
161         return lastTimestamp;
162     }
163 
164     @Override
165     public String toString() {
166         return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
167     }
168 
169 }