Asdfasf

Thursday, September 06, 2012

Broadcasting with Hazelcast


Farkli host'larda clustered calisan uygulamalarimizin birbirleri ile haberlesmelerine ihtiyacimiz var. Host'lardan birinde gerceklesen bir event'in diger hostlar tarafindan haberdar edilmesi lazim. Bu amacla ve daha bir cok ozelligi ile (distributed caching, load balancing, publish/subscriber messaging) ve en onemlisi kullanim basitligi ile Hazelcast kullanabiliriz.

Asagida Hazelcast kullanarak bir broadcasting ornegi yapalim.

Dependency'miz

 <dependency>  
      <groupId>com.hazelcast</groupId>  
      <artifactId>hazelcast</artifactId>  
      <version>2.2</version>  
 </dependency>  

Hazelcast servisimizi ayaga kaldiralim:

    import org.apache.log4j.Logger;   
    import com.hazelcast.config.Config;   
    import com.hazelcast.config.Join;   
    import com.hazelcast.config.NetworkConfig;   
    import com.hazelcast.core.Hazelcast;   
    import com.hazelcast.core.HazelcastInstance;   
    import com.hazelcast.core.MultiTask;   

    public class HazelcastService{  

    public static Logger    logger = Logger.getLogger(HazelcastService.class);   
    private static HazelcastInstance hazelcastInstance;   
    private static boolean   initialized = false;   

    public void deploy() {   
    logger.info("INITIALIZING HAZELCAST......");   
    try {   
     System.setProperty("hazelcast.logging.class",    HazelcastLoggerFactory.class.getName());    
     Config cfg = new Config();   
     cfg.setPort(ServiceProperties.HAZELCAST_PORT);   
     cfg.setPortAutoIncrement(true);   
     NetworkConfig network = cfg.getNetworkConfig();   
     Join join = network.getJoin();   
     join.getMulticastConfig().setEnabled(false);   
     String ips = ServiceProperties.HAZELCAST_CLUSTER_HOST_IPS_SEPERATED_BY_COMMA;   
     StringTokenizer tok = new StringTokenizer(ips, ",");   
     while (tok.hasMoreTokens()) {   
      join.getTcpIpConfig().addMember(tok.nextToken());   
     }   
     join.getTcpIpConfig().setEnabled(true);   
     hazelcastInstance = Hazelcast.newHazelcastInstance(cfg);   
     initialized = true;   
    } catch (Exception e) {   
     logger.error(e);   
    }   
    logger.info("INITIALIZED HAZELCAST......");   
   }   
 }  

Mesaj gonderimini tetikleyelim:

 public static void notifyEvent(String eventId) {  
     if (!initialized) {  
       logger.warn("HazelcastService is not initialized, doint nothing");  
       return;  
     }  
     try {  
       Event event = new Event(eventId);  
       MultiTask<Boolean> task = new MultiTask<Boolean>(event, hazelcastInstance.getCluster().getMembers());        
       broadcast(task);  
     } catch (Exception e) {  
       logger.error(e);  
     }  
   }  
      private static void broadcast(MultiTask<Boolean> task) throws InterruptedException, ExecutionException, TimeoutException {  
     ExecutorService executorService = hazelcastInstance.getExecutorService();  
     executorService.execute(task);  
     Collection<Boolean> results = task.get(3, TimeUnit.SECONDS);  
     logger.info("Call Results: " + Arrays.toString(results.toArray()));  
   }  

Diger hostlarda mesaj'i karsilayacak class'imiz

 public class Event implements Callable<Boolean>, Serializable {  
   protected String eventId;  
   public PricePlanUpdated(eventId) {  
     super();  
     this.eventId = eventId;      
   }  
   public Boolean call() {  
     try {  
       HazelcastService.logger.info("Received event message with eventId:" + eventId);  
       //do job  
       HazelcastService.logger.info("Processed event message with eventId:" + eventId);  
     } catch (Exception e) {  
       HazelcastService.logger.error(e);  
     }  
     return true;  
   }    
 }  

That's all.

No comments: