View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22  
23  import org.apache.omid.committable.CommitTable;
24  import org.apache.omid.metrics.Gauge;
25  import org.apache.omid.metrics.MetricsRegistry;
26  import org.apache.omid.timestamp.storage.TimestampStorage;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  import javax.inject.Inject;
31  import javax.inject.Singleton;
32  
33  import java.io.IOException;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.TimeUnit;
37  
38  import static org.apache.omid.metrics.MetricsUtils.name;
39  
40  /**
41   * The Timestamp Oracle that gives monotonically increasing timestamps based on world time
42   */
43  @Singleton
44  public class WorldClockOracleImpl implements TimestampOracle {
45  
46      private static final Logger LOG = LoggerFactory.getLogger(WorldClockOracleImpl.class);
47  
48      static final long MAX_TX_PER_MS = 1_000_000; // 1 million
49      static final long TIMESTAMP_INTERVAL_MS = 10_000; // 10 seconds interval
50      private static final long TIMESTAMP_ALLOCATION_INTERVAL_MS = 7_000; // 7 seconds
51  
52      private long lastTimestamp;
53      private long maxTimestamp;
54  
55      private TimestampStorage storage;
56      private Panicker panicker;
57  
58      private volatile long maxAllocatedTime;
59  
60      private final ScheduledExecutorService scheduler =
61              Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
62  
63      private Runnable allocateTimestampsBatchTask;
64  
65      private class AllocateTimestampBatchTask implements Runnable {
66          long previousMaxTime;
67  
68          AllocateTimestampBatchTask(long previousMaxTime) {
69              this.previousMaxTime = previousMaxTime;
70          }
71  
72          @Override
73          public void run() {
74              long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
75              try {
76                  storage.updateMaxTimestamp(previousMaxTime, newMaxTime);
77                  maxAllocatedTime = newMaxTime;
78                  previousMaxTime = newMaxTime;
79              } catch (Throwable e) {
80                  panicker.panic("Can't store the new max timestamp", e);
81              }
82          }
83      }
84  
85      @Inject
86      public WorldClockOracleImpl(MetricsRegistry metrics,
87                                 TimestampStorage tsStorage,
88                                 Panicker panicker) throws IOException {
89  
90          this.storage = tsStorage;
91          this.panicker = panicker;
92  
93          metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
94              @Override
95              public Long getValue() {
96                  return maxTimestamp;
97              }
98          });
99  
100     }
101 
102     @Override
103     public void initialize() throws IOException {
104 
105         this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
106 
107         this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
108 
109         // Trigger first allocation of timestamps
110         scheduler.schedule(allocateTimestampsBatchTask, 0, TimeUnit.MILLISECONDS);
111 
112         // Waiting for the current epoch to start. Occurs in case of failover when the previous TSO allocated the current time frame.
113         while ((System.currentTimeMillis() * MAX_TX_PER_MS) < this.lastTimestamp) {
114             try {
115                 Thread.sleep(1000);
116             } catch (InterruptedException e) {
117                continue;
118             }
119         }
120 
121         // Launch the periodic timestamp interval allocation. In this case, the timestamp interval is extended even though the TSO is idle.
122         // Because we are world time based, this guarantees that the first request after a long time does not need to wait for new interval allocation.
123         scheduler.scheduleAtFixedRate(allocateTimestampsBatchTask, TIMESTAMP_ALLOCATION_INTERVAL_MS, TIMESTAMP_ALLOCATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
124     }
125 
126     /**
127      * Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
128      */
129     @Override
130     public long next() {
131 
132         long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS;
133 
134         lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
135 
136         // Return the next timestamp in case we are still in the same millisecond as the previous timestamp was. 
137         if (lastTimestamp >= currentMsFirstTimestamp) {
138             return lastTimestamp;
139         }
140 
141         if (currentMsFirstTimestamp >= maxTimestamp) { // Intentional race to reduce synchronization overhead in every access to maxTimestamp                                                                                                                       
142             while (maxAllocatedTime <= currentMsFirstTimestamp) { // Waiting for the interval allocation
143                 try {
144                     Thread.sleep(1000);
145                 } catch (InterruptedException e) {
146                    continue;
147                 }
148             }
149             assert (maxAllocatedTime > maxTimestamp);
150             maxTimestamp = maxAllocatedTime;
151         }
152 
153         lastTimestamp = currentMsFirstTimestamp;
154 
155         return lastTimestamp;
156     }
157 
158     @Override
159     public long getLast() {
160         return lastTimestamp;
161     }
162 
163     @Override
164     public String toString() {
165         return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
166     }
167 
168     @VisibleForTesting
169     static class InMemoryTimestampStorage implements TimestampStorage {
170 
171         long maxTime = 0;
172 
173         @Override
174         public void updateMaxTimestamp(long previousMaxTime, long nextMaxTime) {
175             maxTime = nextMaxTime;
176             LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTime, nextMaxTime);
177         }
178 
179         @Override
180         public long getMaxTimestamp() {
181             return maxTime;
182         }
183 
184     }
185 }