View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.examples;
19  
20  import org.apache.omid.transaction.HBaseTransactionManager;
21  import org.apache.omid.transaction.RollbackException;
22  import org.apache.omid.transaction.TTable;
23  import org.apache.omid.transaction.Transaction;
24  import org.apache.omid.transaction.TransactionManager;
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.hadoop.hbase.client.Get;
27  import org.apache.hadoop.hbase.client.Put;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import java.util.Arrays;
34  
35  /**
36   * ****************************************************************************************************************
37   *
38   *  Example code which demonstrates the preservation of Snapshot Isolation when writing shared data concurrently
39   *
40   * ****************************************************************************************************************
41   *
42   * Please @see{BasicExample} first
43   *
44   * In the code below, two concurrent transactions (Tx1 & Tx2), try to update the same column in HBase. This will result
45   * in the rollback of Tx2 -the last one trying to commit- due to conflicts in the writeset with the previously
46   * committed transaction Tx1. Also shows how Tx2 reads the right values from its own snapshot in HBase data.
47   *
48   * After building the package with 'mvn clean package' find the resulting examples-<version>-bin.tar.gz file in the
49   * 'examples/target' folder. Copy it to the target host and expand with 'tar -zxvf examples-<version>-bin.tar.gz'.
50   *
51   * Make sure that 'hbase-site.xml' and 'core-site.xml' are either in classpath (see run.sh) or explicitly referenced via
52   * command line arguments. If a secure HBase deployment is needed, use also command line arguments to specify the
53   * principal (user) and keytab file.
54   *
55   * The example requires a user table to perform transactional read/write operations. A table is already specified in
56   * the default configuration, and can be created with the following command using the 'hbase shell':
57   *
58   * <pre>
59   * create 'MY_TX_TABLE', {NAME => 'MY_CF', VERSIONS => '2147483647', TTL => '2147483647'}
60   * </pre>
61   *
62   * Make sure that the principal/user has RW permissions for the given table using also the 'hbase shell':
63   * <pre>
64   * grant '<principal/user>', 'RW', 'MY_TX_TABLE'
65   * </pre>
66   *
67   * Alternatively, a table with a column family already created can be used by specifying the table name and column
68   * family identifiers using the command line arguments (see details also in 'run.sh') If a table namespace is required,
69   * specify it like this: 'namespace:table_name'
70   *
71   * Finally, run the example using the 'run.sh' script without arguments or specifying the necessary configuration
72   * parameters if required.
73   */
74  public class SnapshotIsolationExample {
75  
76      private static final Logger LOG = LoggerFactory.getLogger(SnapshotIsolationExample.class);
77  
78      public static void main(String[] args) throws Exception {
79  
80          LOG.info("Parsing the command line arguments");
81          String userTableName = "MY_TX_TABLE";
82          if (args != null && args.length > 0 && StringUtils.isNotEmpty(args[0])) {
83              userTableName = args[0];
84          }
85          byte[] family = Bytes.toBytes("MY_CF");
86          if (args != null && args.length > 1 && StringUtils.isNotEmpty(args[1])) {
87              family = Bytes.toBytes(args[1]);
88          }
89          LOG.info("Table '{}', column family '{}'", userTableName, Bytes.toString(family));
90  
91          byte[] exampleRow = Bytes.toBytes("EXAMPLE_ROW");
92          byte[] qualifier = Bytes.toBytes("MY_Q");
93          byte[] initialData = Bytes.toBytes("initialVal");
94          byte[] dataValue1 = Bytes.toBytes("val1");
95          byte[] dataValue2 = Bytes.toBytes("val2");
96  
97          LOG.info("--------------------------------------------------------------------------------------------------");
98          LOG.info("NOTE: All Transactions in the Example access column {}:{}/{}/{} [TABLE:ROW/CF/Q]",
99                   userTableName, Bytes.toString(exampleRow), Bytes.toString(family), Bytes.toString(qualifier));
100         LOG.info("--------------------------------------------------------------------------------------------------");
101 
102         LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
103         try (TransactionManager tm = HBaseTransactionManager.newInstance();
104              TTable txTable = new TTable(userTableName))
105         {
106 
107             // A transaction Tx0 sets an initial value to a particular column in an specific row
108             Transaction tx0 = tm.begin();
109             Put initialPut = new Put(exampleRow);
110             initialPut.add(family, qualifier, initialData);
111             txTable.put(tx0, initialPut);
112             tm.commit(tx0);
113             LOG.info("Initial Transaction {} COMMITTED. Base value written in {}:{}/{}/{} = {}",
114                      tx0, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
115                      Bytes.toString(qualifier), Bytes.toString(initialData));
116 
117             // Transaction Tx1 starts, creates its own snapshot of the current data in HBase and writes new data
118             Transaction tx1 = tm.begin();
119             LOG.info("Transaction {} STARTED", tx1);
120             Put tx1Put = new Put(exampleRow);
121             tx1Put.add(family, qualifier, dataValue1);
122             txTable.put(tx1, tx1Put);
123             LOG.info("Transaction {} updates base value in {}:{}/{}/{} = {} in its own Snapshot",
124                      tx1, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
125                      Bytes.toString(qualifier), Bytes.toString(dataValue1));
126 
127             // A concurrent transaction Tx2 starts, creates its own snapshot and reads the column value
128             Transaction tx2 = tm.begin();
129             LOG.info("Concurrent Transaction {} STARTED", tx2);
130             Get tx2Get = new Get(exampleRow);
131             tx2Get.addColumn(family, qualifier);
132             // As Tx1 is not yet committed, it should read the value set by Tx0 not the value written by Tx1
133             Result tx2GetResult = txTable.get(tx2, tx2Get);
134             assert Arrays.equals(tx2GetResult.value(), initialData);
135             LOG.info("Concurrent Transaction {} should read base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
136                      tx2, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
137                      Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
138 
139             // Transaction Tx1 tries to commit and as there're no conflicting changes, persists the new value in HBase
140             tm.commit(tx1);
141             LOG.info("Transaction {} COMMITTED. New column value {}:{}/{}/{} = {}",
142                      tx1, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
143                      Bytes.toString(qualifier), Bytes.toString(dataValue1));
144 
145             // Tx2 reading again after Tx1 commit must read data from its snapshot...
146             tx2Get = new Get(exampleRow);
147             tx2Get.addColumn(family, qualifier);
148             tx2GetResult = txTable.get(tx2, tx2Get);
149             // ...so it must read the initial value written by Tx0
150             LOG.info("Concurrent Transaction {} should read again base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
151                      tx2, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
152                      Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
153 
154             // Tx2 tries to write the column written by the committed concurrent transaction Tx1...
155             Put tx2Put = new Put(exampleRow);
156             tx2Put.add(family, qualifier, dataValue2);
157             txTable.put(tx2, tx2Put);
158             LOG.info("Concurrent Transaction {} updates {}:{}/{}/{} = {} in its own Snapshot (Will conflict with {} at commit time)",
159                      tx2, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
160                      Bytes.toString(qualifier), Bytes.toString(dataValue1), tx1);
161 
162             // ... and when committing, Tx2 has to abort due to concurrent conflicts with committed transaction Tx1
163             try {
164                 LOG.info("Concurrent Transaction {} TRYING TO COMMIT", tx2);
165                 tm.commit(tx2);
166             } catch (RollbackException e) {
167                 LOG.error("Concurrent Transaction {} ROLLED-BACK!!! : {}", tx2, e.getMessage());
168             }
169 
170         }
171 
172     }
173 
174 }