Saturday, April 14, 2012

Windows Azure Queue Sample


You can use azure queue for backend processing. Messages are removed after 1week or so. Azure has triplicate copy of your messages. Their promise is "you will not lose a message", but they should not stay more than 1week. You can access queue from multiple consumers or producer points. Message becomes invisible when consumer gets a message. If they don't delete the message, this message will be accessible after 30seconds as default. This logic makes sense for concurrent access.

Typical usage: Consumer end gets a message and does some work on it. After they complete the work, process should call the delete message. If another process took that message, this process will not have valid "popreceipt" to delete that. If you process is "idempotent", you can reprocess the message. If it is not possible to do that, you need to keep the status for this message at somewhere else.

Here is some simple example code to create a message , peek messages, get message and delete message at Azure Queue.

First, we need to get the client for queue. You can store this client as a reference or regenerate for each call.



 public CloudQueueClient GetQueueClient()
        {
            var accountBlob = ConfigurationManager.AppSettings["AzureBlobStorage"];

            var account = CloudStorageAccount.Parse(accountBlob.ToString());


            return account.CreateCloudQueueClient();

        }

It is best to have interface for your message. Here, we have class and interface for our simple message. You can add custom attributes to your queue metadata. It needs to be within 8kb.



 /// <summary>
    /// Metadata to attach to new queues for dynamic setup
    /// Link queue to specific types if we need
    /// </summary>
    public interface IQueueMetaData
    {
         DateTime CreatedOn { get; set; }
    }

    public class QMessage:IMessage
    {
        public IQueueMetaData QueueMetaData { get; set; }
        public string QueueName { get; set; }
        public string MessageId { get; set; }
        public string PopReceipt { get; set; }
        
        public string SomeMsg { get; set; }
        public override string ToString()
        {

            return   SomeMsg;
        }

        public string Display()
        {
            return "Id:" + MessageId + " pop:" + PopReceipt + " txt:" + SomeMsg;
        }
    }

    public interface IMessage
    {
        IQueueMetaData QueueMetaData { get; set; }
        string QueueName { get; set; }
        string MessageId { get; set; }
        string PopReceipt { get; set; }
        string Display();

    }

Our method to add a message object. You can serialize your object or just write some recordid to work on.


 public void AddMessage(IMessage message)
        {
            // get client to interact with service
            var client = GetQueueClient();
            //get queue object
            CloudQueue cloudQueue = client.GetQueueReference(message.QueueName);
            //create that if we dont have it, so we can hve dynamic queue
            
            if (!cloudQueue.Exists())
            {
                cloudQueue.Create();
                cloudQueue.FetchAttributes();
                //you can add metadata
                cloudQueue.Metadata.Add("CreatedOn", DateTime.UtcNow.ToString());
                cloudQueue.SetMetadata();
                message.QueueMetaData.CreatedOn = DateTime.UtcNow;
            }
            //set message
            CloudQueueMessage qmessage = new CloudQueueMessage(message.ToString());
            cloudQueue.AddMessage(qmessage);

        }





        public IEnumerable<IMessage>  PeekMessages(string q,int n)
        {
             // get client to interact with service
            var client = GetQueueClient();
            //get queue object
            CloudQueue cloudQueue = client.GetQueueReference(q);
            //create that if we dont have it, so we can hve dynamic queue

            if (cloudQueue.Exists())
            {
                return cloudQueue.PeekMessages(n).Select((t) =>
                                                              {
                                                                  return new QMessage()
                                                                             {
                                                                                 MessageId = t.Id,
                                                                                 PopReceipt = t.PopReceipt,
                                                                                 SomeMsg = t.AsString
                                                                             };
                                                              });
            }
            return null;
        }


        public IMessage GetMessage(string q)
        {
            // get client to interact with service
            var client = GetQueueClient();
            //get queue object
            CloudQueue cloudQueue = client.GetQueueReference(q);
            //create that if we dont have it, so we can hve dynamic queue

            if (cloudQueue.Exists())
            {
                var cloudqmessage = cloudQueue.GetMessage();
                return new QMessage()
                           {
                               MessageId = cloudqmessage.Id,
                               PopReceipt = cloudqmessage.PopReceipt,
                               SomeMsg = cloudqmessage.AsString
                           };
            }
            return null;
        }


        public bool DeleteMessage(IMessage message)
        {
            try
            {
                //it may not delete that if something else get that before and doing work on it
                // get client to interact with service
                var client = GetQueueClient();
                //get queue object
                CloudQueue cloudQueue = client.GetQueueReference(message.QueueName);
                if (cloudQueue.Exists())
                {
                    cloudQueue.DeleteMessage(message.MessageId, message.PopReceipt);
                    return true;
                }
            }catch(Exception ex)
            {
                //log
            }
            return false;
        }
        

No comments:

Post a Comment

Hey!
Let me know what you think?