View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.timestamp.storage;
19  
20  import org.apache.curator.framework.CuratorFramework;
21  import org.apache.curator.framework.recipes.atomic.AtomicValue;
22  import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
23  import org.apache.curator.retry.RetryNTimes;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  import javax.inject.Inject;
28  import java.io.IOException;
29  
30  import static org.apache.omid.timestamp.storage.ZKTimestampPaths.TIMESTAMP_ZNODE;
31  
32  class ZKTimestampStorage implements TimestampStorage {
33  
34      private static final Logger LOG = LoggerFactory.getLogger(ZKTimestampStorage.class);
35  
36      static final long INITIAL_MAX_TS_VALUE = 0;
37  
38      private final DistributedAtomicLong timestamp;
39  
40      @Inject
41      public ZKTimestampStorage(CuratorFramework zkClient) throws Exception {
42          LOG.info("ZK Client state {}", zkClient.getState());
43          timestamp = new DistributedAtomicLong(zkClient, TIMESTAMP_ZNODE, new RetryNTimes(3, 1000)); // TODO Configure
44          // this?
45          if (timestamp.initialize(INITIAL_MAX_TS_VALUE)) {
46              LOG.info("Timestamp value in ZNode initialized to {}", INITIAL_MAX_TS_VALUE);
47          }
48      }
49  
50      @Override
51      public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
52  
53          if (newMaxTimestamp < 0) {
54              LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
55              throw new IllegalArgumentException();
56          }
57          if (newMaxTimestamp <= previousMaxTimestamp) {
58              LOG.error("maxTimestamp {} <= previousMaxTimesamp: {}", newMaxTimestamp, previousMaxTimestamp);
59              throw new IllegalArgumentException();
60          }
61          AtomicValue<Long> compareAndSet;
62          try {
63              compareAndSet = timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp);
64          } catch (Exception e) {
65              throw new IOException("Problem setting timestamp in ZK", e);
66          }
67          if (!compareAndSet.succeeded()) { // We have to explicitly check for success (See Curator doc)
68              throw new IOException("GetAndSet operation for storing timestamp in ZK did not succeed "
69                      + compareAndSet.preValue() + " " + compareAndSet.postValue());
70          }
71  
72      }
73  
74      @Override
75      public long getMaxTimestamp() throws IOException {
76  
77          AtomicValue<Long> atomicValue;
78          try {
79              atomicValue = timestamp.get();
80          } catch (Exception e) {
81              throw new IOException("Problem getting data from ZK", e);
82          }
83          if (!atomicValue.succeeded()) { // We have to explicitly check for success (See Curator doc)
84              throw new IOException("Get operation to obtain timestamp from ZK did not succeed");
85          }
86          return atomicValue.postValue();
87  
88      }
89  
90  }