Dec 8, 2011

Messaging with queue in Azure Service being RESTful


I was re-doing some of the labs in the Windows Azure Platform Training Kit (WAPTK) when one of then didn't works: MessagingWithQueue. A got this error message:

Could not connect to net.tcp://xxxx.servicebus.windows.net:9354/. 
The connection attempt lasted for a time span of 00:00:00.0615234. 
TCP error code 10061: No connection could be made because the target machine actively refused it.

Quickly this message is saying that the port 9354 need to be open.  So I was going to ask to open this port, but then I ask my self: “What if I couldn't?” Does Azure service Bus suppose to be super flexible and give me the opportunity to you all kind of connection? Of course it does, so I decide to make a RESTful version of this lab.

Let’s begin

Create a new Cloud project in Visual Studio, and add a web role. In fact, regular web project will work just fine, but to keep it close to the original lab I will start with a cloud on. In the content folder add preloader.gif and override the Site.css (all the code, images and files are available here). In the View / Shared folder override the Site.Master and in View / Home override Index.aspx.  You can run now the application, you should see something like that:

WebPage

Create Azure AppFabric Service Bus

On the Azure management portal at windows.azure.com In the Service Bus section, create a new Service Bus.  You will need the Service Gateway, the Default Issuer (always “owner” in the CTP) and the Default Key.

Create New Queues

To Create a queue the button “Create” in section B will send the text, the queue name, to the Home controller and let us know the result.  To do that let’s add some JavaScript /JQuery code in the Index page.
var getQueuesUrl = '< %= Url.Action("Queues") % >';

$(document).ready(function () {
    loadQueues();
    
    $("#createQueue").submit(function (event) {
        event.preventDefault();
        var url = $(this).attr('action');
        var queueName = $("#queueName").val();
        $(".loading").show();
        $("#send").attr("disabled", "true");
        $("#retrieve").attr("disabled", "true");
        $.post(url, { queueName: queueName })
            .success(function (response) { renderCreateQueueStatus(response); })
            .error(function () { renderCreateQueueStatus(false); });
    });
});

function renderCreateQueueStatus(response) {
    if (response) {
        $("#createQueueStatus").html("Queue created successfully!");
        loadQueues();
    } else {
        $("#createQueueStatus").html("An error occurred, please try again later.");
    }
    $(".loading").hide();
    $("#send").attr("disabled", "");
    $("#retrieve").attr("disabled", "");
}

function loadQueues() {
    $.get(getQueuesUrl).success(function (response) {
        var ul = $("fieldset.center > ul");
        var sendMsgCombo = $("#sendMessageQueue");
        var receiveMsgCombo = $("#retrieveMessageQueue");

        sendMsgCombo.children().remove();
        receiveMsgCombo.children().remove();
        ul.children().remove();

        for (var i = 0; i < response.length; i++) {
            var item = response[i];
            sendMsgCombo.append('<option value="' + item.Name + '">' + item.Name + '</option>');
            receiveMsgCombo.append('<option value="' + item.Name + '">' + item.Name + '</option>');
            ul.append('<li><label>' + item.Name + '</label><div class="msgCountOf' + item.Name.replace(/ /g, '') + '" style="float:right"><label>Messages</label></div></li>');
            updateMessageCountOf(item.Name, item.Messages);
        }
    });

    $(".loading").hide();
}

function updateMessageCountOf(queueName, numberOfMessages) {
 var message = numberOfMessages + " Messages";
 if (numberOfMessages == "0") message = "No Messages";
 if (numberOfMessages == "1") message = numberOfMessages + " Message";
 $("div.msgCountOf" + queueName.replace(/ /g, '') + " > label").html(message);
}
  

Once the document is ready loadQueues() is called.  This function will loop through a list of queues name and fill the two listbox  and the build the middle list .

Using Jquery the $("#createQueue") add a submit function to the button with the ID createQueue and on the success or error will call the function renderCreateQueueStatus to update the content of the Label createQueueStatus.  Then recall loadQueues() so it can refresh the queues lists.

On the server side now we will need a function CreateQueue that accept a string parameter as queue name and return a JsonResult.  This function should act as a HttpPost. To communicate a token is needed.  This is done by the primary call to Index. It’s creating a token for us with the issuer name and issuer secret of our Service bus with the function GetToken.

Put the information about your Service Bus (Service Gateway, Issuer and Key) in the Settings.The default constructor load this information.

To update the list of our queues we will use a function Queues().  This will download the information from “$Resources/Queues” and build a array of Json object with properties: Name and Messages.  It took me some time before this Linq query works, the tricks is to use the namespace when looking for a node.

public class HomeController : Controller
{
    private String mServiceNamespace;
    private static String mBaseAddress;
    private static String mToken;
    private String mIssuerName;
    private String mIssuerSecret;

    private const String SBHOSTNAME = "servicebus.windows.net";
    private const String ACSHOSTNAME = "accesscontrol.windows.net";

    private const String ATOMNS = "{http://www.w3.org/2005/Atom}";
    private const String SBNS = "{http://schemas.microsoft.com/netservices/2010/10/servicebus/connect}";

    public HomeController()
    {

        mServiceNamespace = RoleEnvironment.GetConfigurationSettingValue("namespaceAddress");
        mIssuerName = RoleEnvironment.GetConfigurationSettingValue("issuerName");
        mIssuerSecret = RoleEnvironment.GetConfigurationSettingValue("issuerSecret");
        mBaseAddress = "https://" + mServiceNamespace + "." + SBHOSTNAME + "/";
    }

    public ActionResult Index()
    {

        try
        {
            // Get a SWT token from the Access Control Service, given the issuerName and issuerSecret values.
            mToken = GetToken(mIssuerName, mIssuerSecret);
        }
        catch (WebException we)
        {
            using (HttpWebResponse response = we.Response as HttpWebResponse)
            {
                if (response != null)
                {
                    ViewBag.Message += Environment.NewLine + (new StreamReader(response.GetResponseStream()).ReadToEnd());
                }
                else
                {
                    ViewBag.Message += Environment.NewLine + (we.ToString());
                }
            }
        }
        return View();
    }

    [HttpPost]
    public JsonResult CreateQueue(String queueName)
    {
        try
        {
            var _queueAddress = mBaseAddress + queueName;
            var _webClient = GetWebClient();

            var _putData = @"<entry xmlns=""http://www.w3.org/2005/Atom"">
                                <title type=""text"">" + queueName + @"</title>
                                <content type=""application/xml"">
                                <QueueDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"" />
                                </content>
                             </entry>";

            byte[] _response = _webClient.UploadData(_queueAddress, "PUT", Encoding.UTF8.GetBytes(_putData));
            var _queueDescription = Encoding.UTF8.GetString(_response);

            return Json(_queueDescription, JsonRequestBehavior.AllowGet);
        }
        catch
        {
            return Json(false, JsonRequestBehavior.AllowGet);
        }
    }

    [OutputCache(NoStore = true, Duration = 0, VaryByParam = "*")]
    public JsonResult Queues()
    {
        var _xDoc = XDocument.Parse(GetResources("$Resources/Queues"));

        var _queues = (from entry in _xDoc.Descendants(ATOMNS + "entry")
                       select new
                       {
                           Name = entry.Element(ATOMNS + "title").Value,
                           Messages = entry.Element(ATOMNS + "content").Element(SBNS + "QueueDescription").Element(SBNS + "MessageCount").Value
                       }).ToArray();

        return Json(_queues, JsonRequestBehavior.AllowGet);
    }

    private WebClient GetWebClient()
    {
        var _webClient = new WebClient();
        _webClient.Headers[HttpRequestHeader.Authorization] = mToken;
        return _webClient;
    }

    private String GetToken(String issuerName, String issuerSecret)
    {
        var acsEndpoint = "https://" + mServiceNamespace + "-sb." + ACSHOSTNAME + "/WRAPv0.9/";
        var realm = "http://" + mServiceNamespace + "." + SBHOSTNAME + "/";

        var _values = new NameValueCollection();
        _values.Add("wrap_name", issuerName);
        _values.Add("wrap_password", issuerSecret);
        _values.Add("wrap_scope", realm);

        var _webClient = new WebClient();
        byte[] response = _webClient.UploadValues(acsEndpoint, _values);

        var _responseString = Encoding.UTF8.GetString(response);

        var _responseProperties = _responseString.Split('&');
        var _tokenProperty = _responseProperties[0].Split('=');
        var _token = Uri.UnescapeDataString(_tokenProperty[1]);

        return "WRAP access_token=\"" + _token + "\"";
    }

    private String GetResources(String resourceAddress)
    {
 String _fullAddress = mBaseAddress + resourceAddress;
 var _webClient = GetWebClient();
 return _webClient.DownloadString(_fullAddress); ;
    }


}

Send a Message

On the client side using JQuery we add a submit event that will call SendMessage from our controller then update the status and the message count.

$(document).ready(function () {
    
    $("#sendMessage").submit(function (event) {
        event.preventDefault();
        var url = $(this).attr('action');
        var queueName = $("#sendMessageQueue option:selected").val();
        var message = $("#messageToSend").val();
        $(".loading").show();
        $("#create").attr("disabled", "true");
        $("#retrieve").attr("disabled", "true");
        $.post(url, { message: message, queueName: queueName })
            .success(function (response) {
                renderSendMessageStatus(response);
                updateMessageCountOf(queueName, response);
            })
            .error(function () { renderSendMessageStatus(false); });
    });
});


function updateMessageCountOf(queueName, numberOfMessages) {
    var message = numberOfMessages + " Messages";
    if (numberOfMessages == "0") message = "No Messages";
    if (numberOfMessages == "1") message = numberOfMessages + " Message";
    $("div.msgCountOf" + queueName.replace(/ /g, '') + " > label").html(message);
}

function renderSendMessageStatus(response) {
    if (response) $("#sendMessageStatus").html("Message sent successfully!");
    else $("#sendMessageStatus").html("An error occurred, please try again later.");
    $(".loading").hide();
    $("#create").attr("disabled", "");
    $("#retrieve").attr("disabled", "");
}

function updateMessageCountOf(queueName, numberOfMessages) {
    var message = numberOfMessages + " Messages";
    if (numberOfMessages == "0") message = "No Messages";
    if (numberOfMessages == "1") message = numberOfMessages + " Message";
    $("div.msgCountOf" + queueName.replace(/ /g, '') + " > label").html(message);
}

On the server side the SendMessage function will post our message. 
[HttpPost]
public JsonResult SendMessage(String queueName, String message)
{
    var _fullAddress = mBaseAddress + queueName + "/messages" + "?timeout=60";
    var _webClient = GetWebClient();

    _webClient.UploadData(_fullAddress, "POST", Encoding.UTF8.GetBytes(message));

    return Json("1", JsonRequestBehavior.AllowGet);
}

Retrieve a Message

Finally to retrieve a message wee need to select a queue then send it to the server side. I modify this part of the code because I’m not using the brokeredMessage so I don't have all the properties from the original code lab.

$(document).ready(function () {

$("#retrieveMessage").submit(function (event) {
        $("#noMessage").hide();
        $("#message").hide();
        event.preventDefault();
        var url = $(this).attr('action');
        var queueName = $("#retrieveMessageQueue option:selected").val();
        $(".loading").show();
        $("#send").attr("disabled", "true");
        $("#create").attr("disabled", "true");
        $.get(url, { queueName: queueName })
            .success(function (response) {
                renderMessage(response == null ? false : response.MessageInfo);
                updateMessageCountOf(queueName, response == null ? 0 : response.MessagesInQueue);
            })
            .error(function () { renderMessage(false); });
    });
});

function renderMessage(response) {
    if (response) {
        $("#body").html("<p>" + response + "</p>")
        $("#message").show();
    } else {
        $("#noMessage").show();
    }
    $(".loading").hide();
    $("#send").attr("disabled", "");
    $("#create").attr("disabled", "");
}

function updateMessageCountOf(queueName, numberOfMessages) {
    var message = numberOfMessages + " Messages";
    if (numberOfMessages == "0") message = "No Messages";
    if (numberOfMessages == "1") message = numberOfMessages + " Message";
    $("div.msgCountOf" + queueName.replace(/ /g, '') + " > label").html(message);
}

Than back to server side pretty strait forward retrieving the message from “queueName/messages/head"  without all the properties part. 

public JsonResult RetrieveMessage(String queueName)
{
    var _fullAddress = mBaseAddress + queueName + "/messages/head" + "?timeout=60";
    var _webClient = GetWebClient();

    byte[] response = _webClient.UploadData(_fullAddress, "DELETE", new byte[0]);
    var _message = Encoding.UTF8.GetString(response);

    return Json(new { MessageInfo = _message, MessagesInQueue = GetMessageCount(queueName) }, JsonRequestBehavior.AllowGet);
}


private String GetMessageCount(String queueName)
{
    var _xDoc = XDocument.Parse(GetResources("$Resources/Queues"));

    var _cnt = (from entry in _xDoc.Descendants(ATOMNS + "entry")
                where entry.Element(ATOMNS + "title").Value == queueName
                select entry.Element(ATOMNS + "content").Element(SBNS + "QueueDescription").Element(SBNS + "MessageCount").Value).FirstOrDefault();

    return _cnt;
}

Conclusion

This is just a lab so it’s not using all the best practice.  The goal was to try to convert the lab to use REST method… and it works.

~Franky


References

2 comments:

  1. Nice post.

    Note that with the V1.6 release you can specify the ConnectivityMode to HTTP and so use port 80 instead of 9354.

    ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.Http;

    ReplyDelete