1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.timestamp.storage;
19
20 import org.apache.curator.framework.CuratorFramework;
21 import org.apache.curator.framework.recipes.atomic.AtomicValue;
22 import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
23 import org.apache.curator.retry.RetryNTimes;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import javax.inject.Inject;
28 import java.io.IOException;
29
30 import static org.apache.omid.timestamp.storage.ZKTimestampPaths.TIMESTAMP_ZNODE;
31
32 class ZKTimestampStorage implements TimestampStorage {
33
34 private static final Logger LOG = LoggerFactory.getLogger(ZKTimestampStorage.class);
35
36 static final long INITIAL_MAX_TS_VALUE = 0;
37
38 private final DistributedAtomicLong timestamp;
39
40 @Inject
41 public ZKTimestampStorage(CuratorFramework zkClient) throws Exception {
42 LOG.info("ZK Client state {}", zkClient.getState());
43 timestamp = new DistributedAtomicLong(zkClient, TIMESTAMP_ZNODE, new RetryNTimes(3, 1000));
44
45 if (timestamp.initialize(INITIAL_MAX_TS_VALUE)) {
46 LOG.info("Timestamp value in ZNode initialized to {}", INITIAL_MAX_TS_VALUE);
47 }
48 }
49
50 @Override
51 public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
52
53 if (newMaxTimestamp < 0) {
54 LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
55 throw new IllegalArgumentException();
56 }
57 if (newMaxTimestamp <= previousMaxTimestamp) {
58 LOG.error("maxTimestamp {} <= previousMaxTimesamp: {}", newMaxTimestamp, previousMaxTimestamp);
59 throw new IllegalArgumentException();
60 }
61 AtomicValue<Long> compareAndSet;
62 try {
63 compareAndSet = timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp);
64 } catch (Exception e) {
65 throw new IOException("Problem setting timestamp in ZK", e);
66 }
67 if (!compareAndSet.succeeded()) {
68 throw new IOException("GetAndSet operation for storing timestamp in ZK did not succeed "
69 + compareAndSet.preValue() + " " + compareAndSet.postValue());
70 }
71
72 }
73
74 @Override
75 public long getMaxTimestamp() throws IOException {
76
77 AtomicValue<Long> atomicValue;
78 try {
79 atomicValue = timestamp.get();
80 } catch (Exception e) {
81 throw new IOException("Problem getting data from ZK", e);
82 }
83 if (!atomicValue.succeeded()) {
84 throw new IOException("Get operation to obtain timestamp from ZK did not succeed");
85 }
86 return atomicValue.postValue();
87
88 }
89
90 }