博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop Yarn事件处理框架源码分析
阅读量:6575 次
发布时间:2019-06-24

本文共 3832 字,大约阅读时间需要 12 分钟。

由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。

 在事件处理之前,先注册相应的事件处理handler,收到事件event后,由派发事件的Dispatcher进行派发,默认采用异步事件处理方式将事件放到事件队列(LinkedBlockingQueue)中,消费者会循环从队列中取出事件进行处理。

要使用事件处理,首先需要创建Dispatcher,示例代码如下:

dispatcher = new AsyncDispatcher();//创建  addIfService(dispatcher);// 由于继承AbstractService,可以方便完成服务统一管理,比如初始化和资源释放等操作  dispatcher.register(EventType.class,new EventHandler());//注册对应的事件处理方法

然后通过AsyncDispatcher调用getEventHandler()返回的EventHandler的处理对应事件,AsyncDispatcher类的getEventHandler()方法如下:

@Override  public EventHandler getEventHandler() {    if (handlerInstance == null) {      handlerInstance = new GenericEventHandler();//如果没有注册生产事件处理,就走通用事件处理    }    return handlerInstance;  }
class GenericEventHandler implements EventHandler
{    public void handle(Event event) {      if (blockNewEvents) {        return;      }      /* all this method does is enqueue all the events onto the queue */      int qSize = eventQueue.size();      if (qSize !=0 && qSize %1000 == 0) {        LOG.info("Size of event-queue is " + qSize);      }      int remCapacity = eventQueue.remainingCapacity();      if (remCapacity < 1000) {        LOG.warn("Very low remaining capacity in the event-queue: "            + remCapacity);      }      try {        eventQueue.put(event);//放进队列        drained = false;      } catch (InterruptedException e) {        if (!stopped) {          LOG.warn("AsyncDispatcher thread interrupted", e);        }        throw new RuntimeException(e);      }    };  }

上述完成生产,再看消费如下实现的:

@Overrideprotected void serviceStart() throws Exception {  //start all the components  super.serviceStart();  eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程  eventHandlingThread.setName("AsyncDispatcher event handler");  eventHandlingThread.start();}

查看createThread()方法,如下所示:

Runnable createThread() {    return new Runnable() {      @Override      public void run() {        while (!stopped && !Thread.currentThread().isInterrupted()) {          drained = eventQueue.isEmpty();          // blockNewEvents is only set when dispatcher is draining to stop,          // adding this check is to avoid the overhead of acquiring the lock          // and calling notify every time in the normal run of the loop.          if (blockNewEvents) {            synchronized (waitForDrained) {              if (drained) {                waitForDrained.notify();              }            }          }          Event event;          try {            event = eventQueue.take();          } catch(InterruptedException ie) {            if (!stopped) {              LOG.warn("AsyncDispatcher thread interrupted", ie);            }            return;          }          if (event != null) {            dispatch(event);//分发事件          }        }      }    };  }

从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示:

protected void dispatch(Event event) {  //all events go thru this loop  if (LOG.isDebugEnabled()) {    LOG.debug("Dispatching the event " + event.getClass().getName() + "."        + event.toString());  }  Class
type = event.getType().getDeclaringClass(); try{ EventHandler handler = eventDispatchers.get(type); //通过event获取事件类型,根据事件类型得到注册的EventHandler if(handler != null) { handler.handle(event); //EventHandler处理事件event } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false) { LOG.info("Exiting, bbye.."); System.exit(-1); } }}
整个过程使用生产--消费者模型,异步事件处理,整体实现起来还是很简单的!

转载地址:http://opwno.baihongyu.com/

你可能感兴趣的文章
Windows 下的坐标系
查看>>
IHttpModule与IHttpHandler的区别整理
查看>>
4.2
查看>>
本地windows下新建kafka生产消费数据
查看>>
mysql待整理
查看>>
Amazon S3 API
查看>>
Autofac
查看>>
滑动侧边栏
查看>>
UIView layer 的对应关系
查看>>
新浪研发中心: Berkeley DB 使用经验总结
查看>>
windbg调试句柄泄露
查看>>
好好理解返回值引用
查看>>
理清文本编码
查看>>
实用linux命令
查看>>
mysql之 percona-xtrabackup 2.4.7安装(热备工具)
查看>>
CCF NOI1150 确定进制
查看>>
SpringBoot实战总汇--详解
查看>>
Windows 7,无法访问internet,DNS无响应
查看>>
2018年7月1日笔记
查看>>
尝试使用iReport4.7(基于Ubuntu Desktop 12.04 LTS)
查看>>