您现在的位置是:首页 > 正文

java中使用akka手记三 cluster详例

2024-04-01 01:06:40阅读 7


http://www.tuicool.com/articles/m2muui

原文  http://2014.54chen.com/blog/2014/04/17/how-to-use-akka-in-java-3/

一个例子

  • 同样是typesafe的经典例子。
  • 例子提供的服务是传输文本。当文本发给frontend节点,它会委派backend节点,backend执行转化任务,把结果返回给原来的客户端。
  • 新的backend节点和frontend节点,都可以动态地在cluster上增减。

message

 
public interface TransformationMessages {

  public static class TransformationJob implements Serializable {
    private final String text;
//......
  }

  public static class TransformationResult implements Serializable {
    private final String text;
//.....
  }

  public static class JobFailed implements Serializable {
    private final String reason;
    private final TransformationJob job;
//....
  }

  public static final String BACKEND_REGISTRATION = "BackendRegistration";

}

backend处理逻辑

 
public class TransformationBackend extends UntypedActor {

  Cluster cluster = Cluster.get(getContext().system());
//...  
  @Override
  public void onReceive(Object message) {
    if (message instanceof TransformationJob) {
      TransformationJob job = (TransformationJob) message;
      getSender().tell(new TransformationResult(job.getText().toUpperCase()),
          getSelf());

    } else if (message instanceof CurrentClusterState) {
      CurrentClusterState state = (CurrentClusterState) message;
      for (Member member : state.getMembers()) {
        if (member.status().equals(MemberStatus.up())) {
          register(member);
        }
      }

    } else if (message instanceof MemberUp) {
      MemberUp mUp = (MemberUp) message;
      register(mUp.member());

    } else {
      unhandled(message);
    }
  }

  void register(Member member) {
    if (member.hasRole("frontend"))
      getContext().actorSelection(member.address() + "/user/frontend").tell(
          BACKEND_REGISTRATION, getSelf());
  }
}
  • backend订阅了cluster的事件,检测frontend节点,还会发一条消息告诉fontend可以使用了。
  • frontend节点接收用户的任务,扔给注册好的backend节点。

frontend节点

 
public class TransformationFrontend extends UntypedActor {
  List<ActorRef> backends = new ArrayList<ActorRef>();
  int jobCounter = 0;
  @Override
  public void onReceive(Object message) {
    if ((message instanceof TransformationJob) && backends.isEmpty()) {
      TransformationJob job = (TransformationJob) message;
      getSender().tell(
          new JobFailed("Service unavailable, try again later", job),
          getSender());

    } else if (message instanceof TransformationJob) {
      TransformationJob job = (TransformationJob) message;
      jobCounter++;
      backends.get(jobCounter % backends.size())
          .forward(job, getContext());

    } else if (message.equals(BACKEND_REGISTRATION)) {
      getContext().watch(getSender());
      backends.add(getSender());

    } else if (message instanceof Terminated) {
      Terminated terminated = (Terminated) message;
      backends.remove(terminated.getActor());

    } else {
      unhandled(message);
    }
  }
}
  • frontend用List 保存了backend的actor位置,有需要的时候就轮循发给backend。
  • getSender 本次收到消息的上游,一般用来回复消息。
  • getContext 本actor的上下文。
  • getContext().watch DeathWatch,相当于watch了谁,谁有啥公开动作就会告诉我,包括挂了之类的。
  • ActorRef.forward与tell、ask的区别,性能最好的是tell,发完就走。ask是发完等Future,要等的话性能是个问题。forward用于从一个actor转发消息给另一个actor,原始的sender信息会被保留,在做路由、负载均衡、备份时非常有用。

运行TransformationApp

  • sample.cluster.transformation.TransformationApp 启动三个backend 2551 2552 0为一个cluster,启动一个fronend。
  • frontend每5秒会收到一次任务,接收成功后print代码,代码如下:
 
system.scheduler().schedule(interval, interval, new Runnable() {
      public void run() {
        ask(frontend,
            new TransformationJob("hello-" + counter.incrementAndGet()),
            timeout).onSuccess(new OnSuccess<Object>() {
          public void onSuccess(Object result) {
            System.out.println(result);
          }
        }, ec);
      }

    }, ec);
  • frontend节点中,收到job的时候会去检查backend注册数是否可用了,如果有可用的就forward任务。
 
public void onReceive(Object message) {
    if ((message instanceof TransformationJob) && backends.isEmpty()) {
      TransformationJob job = (TransformationJob) message;
      getSender().tell(
          new JobFailed("Service unavailable, try again later", job),
          getSender());

    } else if (message instanceof TransformationJob) {
      TransformationJob job = (TransformationJob) message;
      jobCounter++;
      backends.get(jobCounter % backends.size())
              .forward(job, getContext());

    } else if (message.equals(BACKEND_REGISTRATION)) {
      getContext().watch(getSender());
      backends.add(getSender());

    } else if (message instanceof Terminated) {
      Terminated terminated = (Terminated) message;
      backends.remove(terminated.getActor());

    } else {
      unhandled(message);
    }
  }
  • 在backend中有一句代码如下:
 
void register(Member member) {
    if (member.hasRole("frontend"))
      getContext().actorSelection(member.address() + "/user/frontend").tell(
          BACKEND_REGISTRATION, getSelf());
  }
  • 解析:backend订阅了memberUp事件,所以在cluster中如果有memberUp了,都会执行上述代码。
  • actorSelection是根据地址进行lookup,返回一个ActorSelection,可以当成本地的actor一样tell。

代码


原创文章如转载,请注明:转载自五四陈科学院[ http://www.54chen.com ]

网站文章

  • fastadmin开发插件的基本流程

    fastadmin开发插件的基本流程

    什么是fastadmin? FastAdmin是一款基于ThinkPHP5+Bootstrap的极速后台开发框架。 fastadmin开发基础管理插件步骤 以一个学校管理插件为例 开发环境与工具 PHPSTORM phpstudy nginx1.15.11 mysql8.0 php7.3.4 准备工作 配置站点 我的域名设置为fast51admin.localhost....

    2024-04-01 01:06:10
  • ios swift 继承_Swift中的继承

    ios swift 继承 什么是继承? (What is Inheritance?) Inheritance allows a class to have the same behavior as o...

    2024-04-01 01:05:36
  • 基于顺序表实现栈的基本操作

    基于顺序表实现栈的基本操作

    栈:什么是栈?又该怎么理解呢?栈(stack)又名堆栈,它是一种运算受限的线性表。其限制是仅允许在表的一端进行插入和删除运算。这一端被称为栈顶,相对地,把另一端称为栈底。栈就是一个桶,后放进去的先拿出来,它下面本来有的东西要等它出来之后才能出来(先进后出)栈(Stack)是操作系统在建立某个进程时或者线程(在支持多线程的操作系统中是线程)为这个线程建立的存储区域,该区域具有FIFO的特性,在编

    2024-04-01 01:05:31
  • NTP配置时间同步

    Oracle RAC两个节点,配置NTP使得两个节点时间同步 1)节点1和节点2都启动NTP服务,节点2(客户端)向节点1(服务器端)同步 如果测试用,可修改节点2的时间不同于节点1的时间,...

    2024-04-01 01:05:25
  • linux主机加入组播组,linux下不同主机间实现组播(练习代码)

    在写聊天室程序时,遇到一个组播问题,不能在不同主机上进行发送接收消息。。。深究发现,是本人对sendto与recvfrom的理解太浅薄。以下代码发送端与接收端可在同一主机,也可在不同主机上运行。代码直...

    2024-04-01 01:05:18
  • 正则表达式总结

    第一点:--------------有关正则前沿介绍 正则表达式是用来进行文本处理的技术,是语言无关的,在几乎所有语言中都有实现。javascript中还会用到。一个正则表达式就是由普通字符以及特殊字符(称为元字符)组成的文字模式。该模式描述在查找文字主体时待匹配的一个或多个字符串。正则表达式作为一个模板,将某个字符模式与所搜索的字符串进行匹配。就像通配符“*.jpg”、“%ab%”,它...

    2024-04-01 01:04:52
  • 计算机系统结构李学干课后习题答案,计算机系统结构习题答案(李学干).doc

    计算机系统结构习题解答第一章习题一1.2一台经解释实现的计算机,可以按照功能划分成4级。每一级为了执行一条指令需要下一级的N条指令解释。若执行第1级的一条指令需K纳秒时间,那么执行第2、3、4级的一条...

    2024-04-01 01:04:45
  • 使用 JavaScript 将相对路径转换为绝对路径

    转自:http://baifa.me/2010/01/convert-to-absolute-path.html有时为了唯一标识网址或其它开发需要,我们需要将相对的网址转换为绝对的网址。当然前人实现方式已经不少,但或多或少的存在缺点或兼容问题。下面我将总结已有实现并给出相对完美的实现。常规实现:地址转换因该实现方式千变万化,故略去代码,但在开发时需考虑 base 标签的会带

    2024-04-01 01:04:38
  • python 根据中文表头标题抓取动态(表格)文档数据

    python 根据中文表头标题抓取动态(表格)文档数据

    思路如图左侧表头标题,要获得右侧数据。网页数据提取成汉字,表格数据间会有空格,用split()分隔成list。用index()查找某个汉字表头位置,输出list下一个位置既是要得到值 text2 ='网页纯文本,爬虫数据自己转换' # 字符串分割成列表,Python 分割字符串使用 变量.split("分割标示符号"[分割次数]),分割次数表示分割最大次数,为空则分割所有。...

    2024-04-01 01:04:12
  • 云计算和外包数据安全分析及建议

    本文讲的是云计算和外包数据安全分析及建议,【IT168 资讯】很多企业,要么已经配置了云计算,要么即将配置云计算。云计算是提高灵活性、减少成本的最新技术。通过提供捆绑、可升级的软件、基础设施、数据存储及通信解决方案,云计算供应商使公司节约了资金、避免了高成本IT承诺、获得了基于所需的有效系统规模,并且可迅速配置最新服务。  一。漫步云端前需要考虑的问题 ...

    2024-04-01 01:04:04