Clustering your application with Hazelcast Talip Ozturk talip@hazelcast.com Whoweare · Foundedin2008 · Startupcompanyfoundedby TalipOzturkandFuadMalikov · Opensourcebusinessmodel.ConsulCngand supportonHazelcast. · Hundredsofusers.MostlyinUSandEurope. · Referencesavailableuponrequest. HazelcastEvents Agenda · Demo · IntroducCon · CodeSamples · Internals · Q/A Map import java.util.Map; import java.util.HashMap; Map map = new HashMap(); map.put("1", "value"); map.get("1"); ConcurrentMap import java.util.Map; import java.util.concurrent.ConcurrentHashMap; Map map = new ConcurrentHashMap(); map.put("1", "value"); map.get("1"); DistributedMap import java.util.Map; import com.hazelcast.core.Hazelcast; Map map = Hazelcast.getMap("mymap"); map.put("1", "value"); map.get("1"); WhyHazelcast? · ScaleyourapplicaCon · Sharedataacrosscluster · ParCConyourdata · Send/receivemessages · Balancetheload · ProcessinparallelonmanyJVM SoluConsintheMarket · · · · · · OracleCoherence IBMWebSphereeXtremeScale/ObjectGrid TerracoWa Gigaspaces Gemstone JBossCache/JGroups/Infinispan Difference · License/Cost · Featureset · Easeofuse · Mainfocus(distributedmap,tuplespace,cache, processingvs.data) · Light/Heavyweight IntroducingHazelcast · Opensource(ApacheLicense) · Superlight,simple,nodependency · Distributed/parCConedimplementaConofmap,queue, set,list,lockandexecutorservice · TransacConal(JCAsupport) · Secure · Topicforpub/submessaging · Clusterinfoandmembershipevents · Dynamicclustering,backup,failover DataParCConinginaCluster Ifyouhave5millionobjectsinyour5nodecluster, theneachnodewillcarry 1millionobjectsand1millionbackupobjects. Server1 Server2 Server3 Server4 Server5 SuperClientinaCluster · -Dhazelcast.super.client=true ·Asfastasanymemberinthecluster ·Holdsnodata Server1 Server2 Server3 Server4 Server5 HazelcastNetwork CodeSamples­ClusterInterface import com.hazelcast.core.*; import java.util.Set; Cluster cluster = Hazelcast.getCluster(); cluster.addMembershipListener(listener); Member localMember = cluster.getLocalMember(); System.out.println (localMember.getInetAddress()); Set setMembers = cluster.getMembers(); CodeSamples­DistributedMap import com.hazelcast.core.Hazelcast; import java.util.Map; Map map = Hazelcast.getMap("customers"); map.put ("1", customer); Customer c = map.get("1"); CodeSamples­DistributedMulCMap import com.hazelcast.core.Hazelcast; mport com.hazelcast.core.MultiMap; MultiMap map = Hazelcast.getMultiMap("orders"); map.put map.put map.put map.put ("1", ("1", ("1", ("2", new new new new Order Order Order Order ("iPhone", 340)); ("MacBook", 1200)); ("iPod", 79)); ("iMac", 1500)); Collection colOrders = map.get ("1"); for (Order order : colOrders) { // process order } boolean removed = map.remove("1", new Order("iPod", 79)); CodeSamples­DistributedQueue import com.hazelcast.core.Hazelcast; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; BlockingQueue queue = Hazelcast.getQueue("tasks"); queue.offer(task); Task t = queue.poll(); Task t = queue.poll(5, TimeUnit.SECONDS); CodeSamples­DistributedSet import com.hazelcast.core.Hazelcast; import java.util.Set; Set set = Hazelcast.getSet("IBM-Quote-History"); set.add (new Price (10, time1)); set.add (new Price (11, time2)); set.add (new Price (13, time3)); for (Price price : set) { // process price } CodeSamples­DistributedLock import com.hazelcast.core.Hazelcast; import java.util.concurrent.locks.Lock; Lock mylock = Hazelcast.getLock(mylockobject); mylock.lock(); try { // do something } finally { mylock.unlock(); } CodeSamples­DistributedTopic import com.hazelcast.core.*; public class Sample implements MessageListener { public static void main(String[] args) { Sample sample = new Sample(); ITopic topic = Hazelcast.getTopic ("default"); topic.addMessageListener(sample); topic.publish ("my-message-object"); } public void onMessage(Object msg) { System.out.println("Got msg :" + msg); } } CodeSamples­DistributedEvents import import import import com.hazelcast.core.IMap; com.hazelcast.core.Hazelcast; com.hazelcast.core.EntryListener; com.hazelcast.core.EntryEvent; public class Sample implements EntryListener { public static void main(String[] args) { Sample sample = new Sample(); IMap map = Hazelcast.getMap ("default"); map.addEntryListener (sample, true); map.addEntryListener (sample, "key"); } public void entryAdded(EntryEvent event) { System.out.println("Added " + event.getKey() + ":" + event.getValue()); } public void entryRemoved(EntryEvent event) { System.out.println("Removed " + event.getKey() + ":" + event.getValue()); } public void entryUpdated(EntryEvent event) { System.out.println("Updated " + event.getKey() + ":" + event.getValue()); } } CodeSamples­TransacCons import import import import com.hazelcast.core.Hazelcast; com.hazelcast.core.Transaction; java.util.Map; java.util.Queue; Map map = Hazelcast.getMap ("default"); Queue queue = Hazelcast.getQueue ("default"); Transaction txn = Hazelcast.getTransaction(); txn.begin(); try { //process obj map.put (key, obj); // process here. txn.commit(); } catch (Exception e) { txn.rollback(); } CodeSamples­Query public class Employee { private boolean active; private String name; private int age; // getters // setters } CodeSamples­Query import import import import com.hazelcast.core.Hazelcast; com.hazelcast.core.IMap; com.hazelcast.query.SqlPredicate; java.util.Collection; IMap map = Hazelcast.getMap("employees"); map.addIndex("active" ,false); map.addIndex("name" ,false); map.addIndex("age" ,true); Collection employees = map.values(new SqlPredicate("active AND age <= 30")); CodeSamples­Persistence import com.hazelcast.core.MapStore, import com.hazelcast.core.MapLoader, public class MyMapStore implements MapStore, MapLoader { public Object load (Object key) { return readFromDatabase(key); } public void store (Object key, Object value) { saveIntoDatabase(key, value); } public void remove(Object key) { removeFromDatabase(key); } } Persistence · WriteBehind:asynchronouslystoringentries · WriteThrough:synchronous · ReadThrough:ifget(key)isnull,loadit CodeSamples­ExecutorService FutureTask futureTask = new DistributedTask(new Echo(input), member); ExecutorService es = Hazelcast.getExecutorService(); es.execute(futureTask); String result = futureTask.get(); ExecutorServiceScenario public int addBonus(long customerId, int extraBonus){ IMap mapCustomers = Hazelcast.getMap("customers"); mapCustomers.lock (customerId); Customer customer = mapCustomers.get(customerId); int currentBonus = customer.addBonus(extraBonus); mapCustomers.put(customerId, customer); mapCustomers.unlock(customerId); return currentBonus; } SendcomputaConoverdata public class BonusAddTask implements Callable, Serializable{ private static final long serialVersionUID = 1L; private long customerId; private long extraBonus; public BonusAddTask () { } public BonusAddTask (long customerId, int extraBonus) { this.customerId = customerId; this.extraBonus = extraBonus; } public Integer call () { IMap mapCustomers = Hazelcast.getMap("customers"); mapCustomers.lock (customerId); Customer customer = mapCustomers.get(customerId); int currentBonus = customer.addBonus(extraBonus); mapCustomers.put(customerId, customer); mapCustomers.unlock(customerId); return currentBonus; } } SendcomputaConoverdata public int addBonus(long customerId, int extraBonus){ ExecutorService es = Hazelcast.getExecutorService(); FutureTask task = new DistributedTask(new BonusAddTask(customerId, extraBonus), customerId); es.execute(task); int currentBonus = task.get(); return currentBonus; } ConfiguraCon dev dev-pass 5701 224.2.2.3 54327 192.168.1.2-5 istanbul.acme 10.3.17.* 10000 60 1 60 10000 LRU 25 Internals:ClusterMembership · MulCcastandUnicastDiscovery · Everymembersendsheartbeatstotheoldest member · OldestMembermanagesthememberships ­Sendsmemberlist ­Tellsmemberstosynctheirdata Internals:SerializaCon · OpCmizedforString, byte[], Long, · CustomserializaConwith (com.hazelcast.nio.DataSerializable) · StandardJavaSerializaCon Integer Internals:SerializaCon public class private private private private Address implements com.hazelcast.nio.DataSerializable { String street; int zipCode; String city; String state; public Address() {} //getters setters.. public void writeData(DataOutput out) throws IOException { out.writeUTF(street); out.writeInt(zipCode); out.writeUTF(city); out.writeUTF(state); } public void readData (DataInput in) throws IOException { street = in.readUTF(); zipCode = in.readInt(); city = in.readUTF(); state = in.readUTF(); } } Internals:SerializaCon public class private private private private private Employee implements com.hazelcast.nio.DataSerializable { String firstName; String lastName; int age; double salary; Address address; //address itself is DataSerializable public Employee() {} public void writeData(DataOutput out) throws IOException { out.writeUTF(firstName); out.writeUTF(lastName); out.writeInt(age); out.writeDouble (salary); address.writeData (out); } public void readData (DataInput in) throws IOException { firstName = in.readUTF(); lastName = in.readUTF(); age = in.readInt(); salary = in.readDouble(); address = new Address(); address.readData (in); } } JVM1 map.put (key, value) JVM2 Call:MPut TCP/IP PacketProcessor Request: Datakey Datavalue Packet Owner? Yes ProcessRequest No Request: Datakey Datavalue Owner? ProcessRequest Internals:DistributedMap · Fixednumberofblocks(segments) · Eachkeyfallsintooneoftheseblocks · Eachblockisownedbyamember · Everymemberknowstheblockowners · blockId = hash(keyData) % BLOCK_COUNT · Blockownershipisreassigneduponmembership change · Blocksandkeysmigrateforloadbalancing PlannedFeatures · C#clients · Distributedjava.util.concurrent. · DistributedTupleSpace {DelayQueue, Semaphore, CountDownLatch} QuesCons? · hWp://www.hazelcast.com · hWp://code.google.com/p/hazelcast/ · hazelcast@googlegroups.com · TwiWer@oztalip · hWp://www.linkedin.com/in/talipozturk