1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.omid.timestamp.storage;
19  
20  import org.apache.curator.framework.CuratorFramework;
21  import org.apache.curator.framework.recipes.atomic.AtomicValue;
22  import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
23  import org.apache.curator.retry.RetryNTimes;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  import javax.inject.Inject;
28  import java.io.IOException;
29  
30  import static org.apache.omid.timestamp.storage.ZKTimestampPaths.TIMESTAMP_ZNODE;
31  
32  class ZKTimestampStorage implements TimestampStorage {
33  
34      private static final Logger LOG = LoggerFactory.getLogger(ZKTimestampStorage.class);
35  
36      static final long INITIAL_MAX_TS_VALUE = 0;
37  
38      private final DistributedAtomicLong timestamp;
39  
40      @Inject
41      public ZKTimestampStorage(CuratorFramework zkClient) throws Exception {
42          LOG.info("ZK Client state {}", zkClient.getState());
43          timestamp = new DistributedAtomicLong(zkClient, TIMESTAMP_ZNODE, new RetryNTimes(3, 1000)); 
44          
45          if (timestamp.initialize(INITIAL_MAX_TS_VALUE)) {
46              LOG.info("Timestamp value in ZNode initialized to {}", INITIAL_MAX_TS_VALUE);
47          }
48      }
49  
50      @Override
51      public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
52  
53          if (newMaxTimestamp < 0) {
54              LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
55              throw new IllegalArgumentException();
56          }
57          if (newMaxTimestamp <= previousMaxTimestamp) {
58              LOG.error("maxTimestamp {} <= previousMaxTimesamp: {}", newMaxTimestamp, previousMaxTimestamp);
59              throw new IllegalArgumentException();
60          }
61          AtomicValue<Long> compareAndSet;
62          try {
63              compareAndSet = timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp);
64          } catch (Exception e) {
65              throw new IOException("Problem setting timestamp in ZK", e);
66          }
67          if (!compareAndSet.succeeded()) { 
68              throw new IOException("GetAndSet operation for storing timestamp in ZK did not succeed "
69                      + compareAndSet.preValue() + " " + compareAndSet.postValue());
70          }
71  
72      }
73  
74      @Override
75      public long getMaxTimestamp() throws IOException {
76  
77          AtomicValue<Long> atomicValue;
78          try {
79              atomicValue = timestamp.get();
80          } catch (Exception e) {
81              throw new IOException("Problem getting data from ZK", e);
82          }
83          if (!atomicValue.succeeded()) { 
84              throw new IOException("Get operation to obtain timestamp from ZK did not succeed");
85          }
86          return atomicValue.postValue();
87  
88      }
89  
90  }