频道是Phoenix中非常exciting和强大的一部分。它让我们能简单地为应用添加软实时特性。频道基于一个简单的想法 - 收发信息。发送者发布关于话题的信息。接收者关注了这个话题,然后他们就能得到那些信息。发送者和接收者在任何时候都能互换角色。
Elixir本身就是基于信息传送的,你可能会想知道为什么我们需要这个额外的机制来收发信息。使用频道,发送者和接收者都可以不是Elixir进程。它们可以使任何能与频道交流的东西 - JavaScript客户端,iOS应用,另一个Phoenix应用,我们的手表。而且,一个频道中广播的信息可能会有很多接收者。而Elixir进程的交流是一对一的。
“频道”这个词很难描述一个有着许多组件的分层系统。让我们快速浏览一下它们,以便看到它的全貌。
配件
- Socket Handlers
Phoenix有一个到服务器的连接,在这个连接上会多路复用你的频道sockets。Socket handlers,例如web/channels/user_socket.ex
,是一个模块,用于验证和鉴定一个socket连接,并允许你为所有频道设置默认socket。
- Channel Routes
它们定义与Socket handlers中,例如web/channels/user_socket.ex
,这使得它们与别的routes不同。它们匹配与话题字符串,并调遣匹配任务 到给定的频道模块。星形符*
的作用像是通配符,所以在下面的route例子中,对sample_topic:pizza
和sample_topic:oranges
的请求都会被调遣到SampleTopicChannel
。
channel "sample_topic:*", HelloPhoenix.SampleTopicChannel
- Channels
Channels处理客户端来的事件,所以和控制器类似,但有两个关键的不同。Channel 事件是双向的 - 进和出。频道连接也存在于单个请求/回应的循环之外。频道是Phoenix中实时交流组件的最高等级抽象。
每个频道都会为join/3
, terminate/2
, handle_in/3
, 和 handle_out/3
这四个回调函数中的每一个实现一个或更多从句。
- PubSub
Phoenix PubSub层由Phoenix.PubSub
模块和各种不同适配器和它们的GenServer
的模块组成。这些模块中包含了组成频道交流的函数 - 关注话题,取消关注,在话题内广播信息。
如果需要的话,可以定义我们自己的PubSub适配器。请到 查看更多。
单独使用这些模块对于Phoenix来说毫无意义。Channels使用它们作为自己的发动机。作为用户,我们不需要在应用中直接使用它们。
- Messages
Phoenix.Socket.Message
模块用下面的keys定义了一个结构,来表示合法message。。
-
topic
- 话题字符串,或话题:子话题命名空间对,比如“messages”, “messages:123” -
event
- 事件名字符串,例如“phx_join” -
payload
- 消息有效载荷 -
ref
- 独特字符串ref -
Topics
Topics是字符串id - 不同层使用的名字,为了确认消息在正确的地方结束。正如我们在上面看到的,topics可以使用通配符。这对"topic:subtopic"很有用。你经常会使用你的model层中的记录ID来组成topics,例如"users:123"
。
- Transports
transport层是公路的最上层。Phoenix.Channel.Transport
模块负责调遣所有message对一个Channel的进出。
- Transport Adapters
默认的transport机制来源于WebSockerts,如果WebSockets不可用,它会退回到LongPolling。使用其它的交通适配器是可能的,只要遵守适配器协议,我们也可以自己编写一个。例子请看Phoenix.Transports.WebSocket
。
- 客户端库
Phoenix现在装载了它自己的JavaScript客户端。, , 还有 。
尝试它们全部
让我们构建一个简单的聊天应用,来将这些东西都用上。在 之后,我们会看到endpoint已经为我们设置好了,在lib/hello_phoenix/endpoint.ex
中:
defmodule HelloPhoenix.Endpoint do use Phoenix.Endpoint, otp_app: :hello_phoenix socket "/socket", HelloPhoenix.UserSocket ...end
我们在endpoint中所指向的HelloPhoenix.UserSocket
,已经生成应用时创建好了,在web/channels/user_socket.ex
中。我们需要确认messages已经获得了到正确的channel的route。所以,我们将对"room:*" channnel定义取消注释:
defmodule HelloPhoenix.UserSocket do use Phoenix.Socket ## Channels channel "room:*", HelloPhoenix.RoomChannel ...
现在,无论何时,一个客户端发送一个话题开头为"room:"
的消息,都会route到我们的RoomChannel。下一步,我们将定义一个HelloPhoenix.RoomChannel
模块来管理我们的聊天室消息。
加入频道
你的channels要做的第一件事就是授权客户端加入一个给定的topic。为了授权,我们必须在web/channels/room_channel.ex
中实现join/3
。
defmodule HelloPhoenix.RoomChannel do use Phoenix.Channel def join("room:lobby", _message, socket) do {:ok, socket} end def join("room:" <> _private_room_id, _params, _socket) do {:error, %{reason: "unauthorized"}} endend
对于我们的聊天app,我们允许任何人加入"room:lobby"
话题,但是其他任何房间都会被认为是私人的并需要特殊授权,从数据库的角度说,是被要求的。这里,我们不必管那些私人聊天室。为了授权socket加入topic,我们返回{:ok, socket}
或{:ok, reply, socket}
。拒绝访问,我们会返回{:error, reply}
。关于token授权的更多信息,请看 。
channel已经设置好,让我们的客户端和服务器开始对话吧。
Phoenix项目使用 来构建,除非你在运行mix phoenix.new
时加上了--no-brunch
选项。
如果你使用了brunch,在web/static/js/socket.js
中会有一个基于socket实现定义的简单客户端。
我们可以使用这个库来连接到我们的socket并加入我们的channel,我们只需要将房间名"room:lobby" 放到那个文件中。
// web/static/js/socket.js...socket.connect()// Now that you are connected, you can join channels with a topic:let channel = socket.channel("room:lobby", {})channel.join() .receive("ok", resp => { console.log("Joined successfully", resp) }) .receive("error", resp => { console.log("Unable to join", resp) })export default socket
之后,我们需要确认web/static/js/socket.js
被import到了我们的应用JavaScript文件中。所以,将web/static/js/app.js
的最后一行取消注释。
...import socket from "./socket"
保存文件,你的浏览器应该会自动刷新,感谢Phoenix的热重载功能。如果一切正常,你会在浏览器的JavaScript控制台中看到"Joined successfully" 。我们的客户端和服务器现在通过一个持续的连接在对话。现在来开启聊天功能。
在web/templates/page/index.html.eex
中,我们将用一个可以容纳我们的聊天消息的容器,和一个输入框来代替已存在的代码:
我们也将在web/templates/layout/app.html.eex
中添加jQuery到应用的layout:
... <%= render @view_module, @view_template, assigns %>
现在让我们添加几个事件监听器到web/static/js/socket.js
:
...let channel = socket.channel("room:lobby", {})let chatInput = $("#chat-input")let messagesContainer = $("#messages")chatInput.on("keypress", event => { if(event.keyCode === 13){ channel.push("new_msg", {body: chatInput.val()}) chatInput.val("") }})channel.join() .receive("ok", resp => { console.log("Joined successfully", resp) }) .receive("error", resp => { console.log("Unable to join", resp) })export default socket
我们要做的就是,监测到回车被按下,然后push
一个包含了消息本体的事件到channel。我们为这个事件取名为"new_msg"。让我们继续构建这个聊天应用的其他部分,包括监听新的消息和追加它们到消息容器中。
...let channel = socket.channel("room:lobby", {})let chatInput = $("#chat-input")let messagesContainer = $("#messages")chatInput.on("keypress", event => { if(event.keyCode === 13){ channel.push("new_msg", {body: chatInput.val()}) chatInput.val("") }})channel.on("new_msg", payload => { messagesContainer.append(`[${Date()}] ${payload.body}`)})channel.join() .receive("ok", resp => { console.log("Joined successfully", resp) }) .receive("error", resp => { console.log("Unable to join", resp) })export default socket
我们使用channel.on
来监听"new_msg"
事件,然后将消息本体追加到DOM。现在让我们来处理服务器上进出的消息,完成最后的步骤。
传入事件
我们使用handle_in/3
来处理传入事件。我们可以对事件名进行模式匹配,类似"new_msg"
,然后抓住客户端传送给channel的payload。对于我们的聊天应用,我们只需要通过broadcast!/3
通知所有其他room:lobby
的关注者,有新信息。
defmodule HelloPhoenix.RoomChannel do use Phoenix.Channel def join("room:lobby", _message, socket) do {:ok, socket} end def join("room:" <> _private_room_id, _params, _socket) do {:error, %{reason: "unauthorized"}} end def handle_in("new_msg", %{"body" => body}, socket) do broadcast! socket, "new_msg", %{body: body} {:noreply, socket} end def handle_out("new_msg", payload, socket) do push socket, "new_msg", payload {:noreply, socket} endend
broadcast!/3
会通知所有已加入这个socket
的话题的客户端,并调用它们的handle_out/3
回调。handle_out/3
不是一个必须的回调,但它允许我们自定义和过滤广播,在它们到达每个客户端之前。默认的,handle_out/3
就只是简单地push消息到客户端,就像我们这里定义的一样。而将它和传出事件联系在一起,就能进行强大的消息自定义和过滤。
拦截传出事件
我们不会为应用实现这个,但想象一下,我们的聊天app允许用户忽略新用户加入房间的通知。我们可以这样实现它,明确地告知Phoenix,我们想要拦截的传出事件,然后为这些事件定义一个handle_out/3
回调。(当然,需要假设我们有一个带有ignoring?/2
函数的User
model,而且我们通过assigns
映射来传入user。)
intercept ["user_joined"]def handle_out("user_joined", msg, socket) do if User.ignoring?(socket.assigns[:user], msg.user_id) do {:noreply, socket} else push socket, "user_joined", msg {:noreply, socket} endend
这就是我们的基础聊天app。打开多个浏览器窗口,你会看到你的消息被push并广播到了所有窗口!
Socket Assigns
与连接结构,%Plug.Conn{}
,类似,也有可能将值赋给一个channel socket。Phoenix.Socket.assign/3
可以很方便地以assign/3
的形式import到一个channel模块中:
socket = assign(socket, :user, msg["user"])
Sockets将得到的值以映射形式存放在socket.assigns
。
容错性和可靠性
服务器重启,网络中断,客户端失去连接。为了设计出健壮的系统,我们需要理解Phoenix是如何响应这些事件,以及是什么在保证它们。
- 处理重连
客户端关注了话题,Phoenix将这些关注选项存放在一个内存中的ETS表格里。如果一个channel崩溃了,客户端会需要重连到它们之前关注了的话题。幸运的是,Phoenix的JavaScript客户端知道怎么做。服务器会通知所有客户端崩溃的消息。这会触发每个客户端的Channel.onError
回调。客户端会试图使用一个指数后退策略来重连到服务器。一旦重连,它们会试图加入它们之前关注了的话题。一旦成功,它们会开始从这些话题继续接受消息。
- 重新发送客户端消息
channel客户端将传出消息按队列放入一个PushBuffer
,并在有连接时将它们发送到服务器。没有连接时,客户端会保留这些消息,直到建立新的连接,或者直到收到一个timeout
事件。默认的超时事件是5000毫秒。客户端不会将这些消息保留在本地存储中,所以如果浏览器窗口关闭了,消息就会消失。
- 重新发送服务器消息
Phoenix在发送消息给客户端时,会使用一个at-most-once策略。如果客户端掉线了并丢失了消息,Phoenix不会重新发送。Phoenix不会再服务器上保留消息。如果服务器重启,未发送的消息会消失。如果我们的应用需要保证消息的送达,我们就要自己写那些代码。常用方法包括保留消息在服务器上,以及让客户端请求丢失的消息。例如,看看Chris McCord的Phoenix练习: 以及 。
应用范例
想看看我们刚才做的应用的范例,请到()。
你也可以看看这个demo()。