Saturday, July 13, 2013

ZK: Server Push with WebSocket


Introduction

This article describe how to do Server Push with WebSocket in ZK (this is just a POC with a customized Intbox).

Environment: ZK 6.0.2, Tomcat 7.0.42, Latest Chrome/Firefox.

Result

View demo on line
http://screencast.com/t/8gu7FLic5Ul

Pre-request

ZK Serve Push with Java Timer
http://ben-bai.blogspot.tw/2013/07/zk-serve-push-with-java-timer.html

ZK: Override Widget in zk.xml
http://ben-bai.blogspot.tw/2013/07/zk-override-widget-in-zkxml.html

Simple WebSocket Test with Tomcat
http://ben-bai.blogspot.tw/2013/07/simple-websocket-test-with-tomcat.html

Group Connections with WebSocket
http://ben-bai.blogspot.tw/2013/07/group-connections-with-websocket.html

Program

index.zul

Intbox with custom java class and control buttons.

<zk xmlns:w="client">
    <div apply="test.TestComposer">
        Self:
        <!-- custom intbox,
            use custom java class test.Intbox
            also customize widget in WEB-INF/zk.xml
            support WebSocket
            
            socket context is 'self' denotes does
            not listen to any context,
            for component only -->
        <intbox id="ibx" readonly="true"
            socketContext="self" use="test.Intbox" />
        <!-- button for start server push -->
        <button id="startBtn" label="start" />
        <!-- button for stop server push -->
        <button id="stopBtn" label="stop" />
        <!-- button for show value of intbox 'ibx'
            to make sure both client/server are updated properly -->
        <button label="show value" onClick="alert(ibx.getValue());" />
        <div />
        <!-- custom intbox listen to socket context 'counter' -->
        Counter: <intbox id="ibxCounter" readonly="true"
            socketContext="counter" use="test.Intbox" />
        <div />
        Negative Counter: <intbox id="ibxNegativeCounter" readonly="true"
            socketContext="negativeCounter" use="test.Intbox" />
        <div />
        <!-- button for update value of context 'counter -->
        <button id="updateCounterBtn" label="updaet counter and negative counter" />
        <div />
        <button label="show value of counter" onClick="alert(ibxCounter.getValue());" />
        <button label="show value of negative counter" onClick="alert(ibxNegativeCounter.getValue());" />
        <!-- button for switch context of 'ibxCounter' and 'ibxNegativeCounter'
            write in zul page directly for keeping composer clear
            since this is not the major part -->
        <button label="switch context">
            <attribute name="onClick"><![CDATA[
                String cOne = ibxCounter.getSocketContext();
                String cTwo = ibxNegativeCounter.getSocketContext();
                ibxCounter.setSocketContext(cTwo);
                ibxNegativeCounter.setSocketContext(cOne);
            ]]></attribute>
        </button>
    </div>
</zk>


TestComposer.java

Composer used to handle server push task/action.

package test;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

import org.zkoss.zk.ui.select.SelectorComposer;
import org.zkoss.zk.ui.select.annotation.Listen;
import org.zkoss.zk.ui.select.annotation.Wire;

/**
 * Tested with ZK 6.0.2
 * @author benbai123
 *
 */
@SuppressWarnings("rawtypes")
public class TestComposer extends SelectorComposer {

    private static final long serialVersionUID = 5928314519324520566L;
    @Wire
    Intbox ibx;
    private final AtomicInteger _cnt = new AtomicInteger(0);
    private final static AtomicInteger _cntCounter = new AtomicInteger(0);
    private Timer timer;
    /**
     * start server push with WebSocket for
     * specific component 'ibx'
     */
    @Listen("onClick = #startBtn")
    public void start () {
        if (timer == null) {
            timer = new Timer();
            timer.schedule(getTask(), 0, 1000);
        }
    }
    /**
     * stop server push with WebSocket for
     * specific component 'ibx'
     */
    @Listen("onClick = #stopBtn")
    public void stop () {
        if (timer != null) {
            timer.cancel();
            timer = null;
        }
    }
    /**
     * update value to context 'counter' and 'negativeCounter' via
     * WebSocket
     * 
     * All components that listen to these context will be updated
     */
    @Listen("onClick = #updateCounterBtn")
    public void updateCounterBtn () {
        String msg = "" + _cntCounter.incrementAndGet();
        TestWebSocketServlet.sendBySocketContext(msg, "counter");
        TestWebSocketServlet.sendBySocketContext("-" + msg, "negativeCounter");
    }
    // task to be scheduled
    public TimerTask getTask () {
        return new TimerTask() {
            public void run () {
                update();
            }
        };
    }
    // update value of intbox 'ibx'
    public void update () {
        ibx.updateSelfValueWithWebSocket(_cnt.getAndIncrement());
    }
}


Intbox.java

Custom Intbox support WebSocket action.

package test;

/**
 * Enhanced Intbox that support WebSocket action
 * @author benbai123
 *
 */
public class Intbox extends org.zkoss.zul.Intbox implements IWebsocketEnhancedComponent {

    private static final long serialVersionUID = 1711581315927992296L;
    /** context used to create socket connection
     * used by custom mapping rule
     */
    private String _socketContext = "";
    private String _mappingId;
    public void setSocketContext (String socketContext) {
        // no need to clear old connection,
        // old connection will be closed at client side
        // then trigger onClose method of connection at
        // server side to clear it
        if (socketContext == null) {
            socketContext = "";
        }
        if (!socketContext.equals(_socketContext)) {
            _socketContext = socketContext;
            // register if has context
            if (!socketContext.isEmpty()) {
                register();
            }
            smartUpdate("socketContext", getMappingContext());
        }
    }
    // getter
    public String getSocketContext () {
        return _socketContext;
    }
    public String getBaseId () {
        return getUuid();
    }
    public String getMappingId () {
        return _mappingId;
    }
    /**
     * append mapping id to context, will parse it in TestWebSocketServlet,
     * @see test.TestWebSocketServlet#createWebSocketInbound(String, javax.servlet.http.HttpServletRequest)
     * @return
     */
    private String getMappingContext () {
        return _socketContext + "_" + _mappingId;
    }
    /**
     * Update value to client side via WebSocket
     * two steps:
     * 1. Set value at server side without any 'update client' action
     * 2. Update value to client side via WebSocket
     * @param value
     * @see test.TestWebSocketServlet#sendByComponent(String, org.zkoss.zk.ui.Component)
     */
    public void updateSelfValueWithWebSocket (int value) {
        setValueDirectly(value);
        TestWebSocketServlet.sendByComponent(value+"", this);
    }

    public void notifyByWebSocket (String msg) {
        setValueDirectly(Integer.parseInt(msg));
    }
    /**
     * Register this component for notify back
     * @see test.TestWebSocketServlet#register(IWebsocketEnhancedComponent)
     * @see test.TestWebSocketServlet#sendBySocketContext(String, String)
     */
    private void register () {
        _mappingId = TestWebSocketServlet.register(this);
    }
    // render socketContext as needed
    protected void renderProperties(org.zkoss.zk.ui.sys.ContentRenderer renderer)
        throws java.io.IOException {
        super.renderProperties(renderer);
        if (!"".equals(_socketContext)) {
            render(renderer, "socketContext", getMappingContext());
        }
    }
}


zk.xml

Customize client widget to work with WebSocket.

<zk>
    <device-config>
        <device-type>ajax</device-type>
        <embed><![CDATA[
            <script type="text/javascript">
                zk.afterLoad("zul.inp", function () {
                    var _iwgt = {};
                    zk.override(zul.inp.InputWidget.prototype, _iwgt, {
                        // called when receive message from WebSocket
                        doWebSocketMessage_: function (msg) {
                            // update value of input node
                            jq(this.getInputNode()).val(msg.data);
                        }
                    });
                });
                zk.afterLoad("zul", function () {
                    var _wgt = {};
                    zk.override(zk.Widget.prototype, _wgt, {
                        // setter for set context of WebSocket
                        setSocketContext: function (v) {
                            this._socketContext = v;
                            if (this.$n())
                                this.initWebSocket();
                        },
                        bind_: function (dt, skipper, after) {
                            _wgt.bind_.apply(this, arguments);
                            // initiate WebSocket after bind_
                            this.initWebSocket();
                        },
                        // init WebSocket
                        initWebSocket: function () {
                            // close old at first
                            if (this.TestWebSocket)
                                this.TestWebSocket.disconnect();
                            // create new if a context specified
                            if (this._socketContext) {
                                var wgt = this;
                                this.TestWebSocket = {
                                    socket: null,
                                    connect: (function() {
                                        // .wsreq for servlet mapping
                                        var path = window.location.host + window.location.pathname,
                                            host = 'ws://' + path + wgt._socketContext + '.wsreq';
                                        if ('WebSocket' in window) {
                                            this.socket = new WebSocket(host);
                                        } else if ('MozWebSocket' in window) {
                                            this.socket = new MozWebSocket(host);
                                        } else {
                                            alert('Error: WebSocket is not supported by this browser.');
                                            return;
                                        }
                                        // process message from server
                                        this.socket.onmessage = function (msg) {
                                            wgt.doWebSocketMessage_(msg);
                                        };
                                    }),
                                    disconnect: function () {
                                        // close and clear
                                        this.socket.close();
                                        this.socket = null;
                                        wgt.TestWebSocket = null;
                                    }
                                };
                                
                                this.TestWebSocket.connect();
                            }
                        },
                        doWebSocketMessage_: function (msg) {
                            // for child override
                        }
                    });
                });
            </script>
        ]]></embed>
    </device-config>
</zk>


IWebsocketEnhancedComponent.java

Define a "WebSocket Enhanced Component"

package test;

/**
 * Define a "WebSocket Enhanced Component"
 * @author benbai123
 *
 */
public interface IWebsocketEnhancedComponent {
    /**
     * Called when the listening context is updated
     * @see test.TestWebSocketServlet#sendBySocketContext(String, String)
     */
    public void notifyByWebSocket (String msg);
    /**
     * Base id of this component, components in different sessions probably use the same base id
     * @return
     */
    public String getBaseId ();
    /**
     * Mapping id of this component, should be unique in whole application
     * @return
     */
    public String getMappingId ();
}


TestWebSocketServlet.java

Servlet used to initiate and handle WebSocket connections.

package test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.servlet.http.HttpServletRequest;

import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import org.apache.catalina.websocket.WsOutbound;

/**
 * Tested with Tomcat 7.0.42 and ZK 6.0.2
 * @author benbai123
 *
 */
public class TestWebSocketServlet extends WebSocketServlet {

    private static final long serialVersionUID = -7663708549630020769L;

    // hold each connection
    private static final Set<TestMessageInbound> connections =
        new CopyOnWriteArraySet<TestMessageInbound>();
    // hold each related component
    private static final Map<String, IWebsocketEnhancedComponent> registeredComponents =
        new Hashtable<String, IWebsocketEnhancedComponent>();

    /**
     * For create connection only, each connection will
     * handle it self as needed
     */
    @Override
    protected StreamInbound createWebSocketInbound(String subProtocol,
            HttpServletRequest request) {
        // request uri
        String uri = request.getRequestURI();

        // infos within uri, format is context_mappingId.wsreq e.g., counter_a1234.wsreq
        String infos = uri.substring(uri.lastIndexOf("/")+1, uri.length()).replace(".wsreq", "");
        String context = infos.substring(0, infos.lastIndexOf("_"));
        String compMappingId = infos.substring(infos.lastIndexOf("_")+1, infos.length());
        
        return new TestMessageInbound(context, compMappingId);
    }
    private final class TestMessageInbound extends MessageInbound {
        // hold context and mappingId of related component
        private String _context;
        private String _compMappingId;
        // constructor
        public TestMessageInbound (String context, String compMappingId) {
            _context = context;
            _compMappingId = compMappingId;
        }
        // add self instance into connections Set while opened
        @Override
        protected void onOpen(WsOutbound outbound) {
            connections.add(this);
        }
        // remove self instance from connections set and
        // clear component reference while closed
        @Override
        protected void onClose(int status) {
            connections.remove(this);
            registeredComponents.remove(_compMappingId);
        }
        // ignore binary message
        @Override
        protected void onBinaryMessage(ByteBuffer message) throws IOException {
            // ignore
        }
        // ignore text message
        @Override
        protected void onTextMessage(CharBuffer message) throws IOException {
            // ignore
        }
        /**
         * get socket context of this connection
         * @return
         */
        public String getContext () {
            return _context;
        }
        /**
         * get mappingId of related component of this connection
         * @return
         */
        public String getCompMappingId () {
            return _compMappingId;
        }
    }

    /**
     * send message via WebSocket with specified socket context
     * all components that connect to this context will be updated
     * @param msg message to send
     * @param socketContext target socket context
     */
    public static void sendBySocketContext (String msg, String socketContext) {
        for (TestMessageInbound connection : connections) {
            
            try {
                // send message to specified socketContext
                // ignore self context and other different context
                if (!"self".equals(connection.getContext())
                    && connection.getContext().equals(socketContext)) {
                    // send message via WebSocket
                    connection.getWsOutbound().writeTextMessage(CharBuffer.wrap(msg));
                    // pass sent message to component
                    // so the component can update itself if needed
                    registeredComponents.get(connection.getCompMappingId()).notifyByWebSocket(msg);
                }
            } catch (IOException ignore) {
                /* ignore */
            }
        }
    }
    /**
     * send message via WebSocket to specific component
     * @param msg message to send
     * @param comp target component
     */
    public static void sendByComponent (String msg, IWebsocketEnhancedComponent comp) {
        for (TestMessageInbound connection : connections) {
            try {
                // send message to specific component
                if (connection.getCompMappingId().equals(comp.getMappingId())) {
                    connection.getWsOutbound().writeTextMessage(CharBuffer.wrap(msg));
                }
            } catch (IOException ignore) {
                /* ignore */
            }
        }
    }
    /**
     * register component so can notify it as needed
     * @param comp component to register
     * @return String, mapping id for specified component
     */
    public static String register (IWebsocketEnhancedComponent comp) {
        String id = comp.getBaseId();
        String mappingId = id;
        int i = 2;
        // check it since components in different sessions probably use the same id
        synchronized (registeredComponents) {
            while (registeredComponents.containsKey(mappingId)) {
                mappingId = id + i;
                i++;
            }
            registeredComponents.put(mappingId, comp);
        }
        return mappingId;
    }
}


web.xml

Define servlet mapping.

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0"> 
    <servlet>
        <servlet-name>testWebSocketServlet</servlet-name>
        <servlet-class>test.TestWebSocketServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>testWebSocketServlet</servlet-name>
        <url-pattern>*.wsreq</url-pattern>
    </servlet-mapping>
    <welcome-file-list>
        <welcome-file>index.zul</welcome-file>
    </welcome-file-list>
</web-app>


Download

Full project at github
https://github.com/benbai123/ZK_Practice/tree/Tag_for_Component_Based_WebSocket_ServerPush/Pattern/ServerPush/ServerPushWithWebSocket

Demo Flash
https://github.com/benbai123/ZK_Practice/blob/master/demo_src/swf/Pattern/ServerPush/ServerPushWithWebSocket.swf

No comments:

Post a Comment