SocketRocket and PrivatePub

written by Raphael

At work we're using Ryan Bates excellent Private Pub gem to push updates from background workers to web clients. Our client wanted an iOS App as well and since I recently read about SocketRocket I was wondering if I could keep the current infrastructure and push data to the iOS App as well.

The web app consists of a Ruby on Rails 3.2 frontend which utilizes Resque for long running background jobs. The results of the Resque jobs are are pushed to all connected clients using Private Pub. Pretty simple. However you need to make some adjustments for SocketRocket and Private Pub to play nice.

TL; DR;

I've uploaded the source code for a sample Ruby on Rails app as well as a sample implementation for iOS on Github. See

You can follow the setup of the Ruby on Rails app using my commits. The Rails App uses the following gems:

The iOS SocketRocket client depends on:

  • SocketRocket - websocket client implementation in Objective-C
  • SBJson - JSON parsing framework to process server responses
  • AFNetworking - to fetch the websocket client configuration

In this post I'll try to outline the basic steps to get you up and running.

Server first - Ruby on Rails configuration & setup

Exposing Private Pub subscription informations

Private Pub protects us from unwanted clients using a signature to authenticate each subscriber. For SocketRocket to talk to Private Pub we need to get the required authentication informations first.


class Api::WebsocketsController < ApplicationController
  respond_to :json

  def configuration
    subscription = PrivatePub.subscription channel: params.fetch(:channel) { '/messages/new' }

    respond_with subscription
  end
end

WebsocketsController#configuration serves the PrivatePub subscription informations to a potential client. Note that this is a very simple example. I did not add things like API authentication which could be easily done using devise or authorization which could also be done easily using cancan.

Add the following to make the WebsocketsController accessible to the outside:


namespace :api do
  match 'websockets/configuration' => 'websockets#configuration', via: :get
end

Testing WebsocketsController#configuration

Once you've started the example application you should be able to test this using curl:


$ curl  http://localhost:3000/api/websockets/configuration.json
{"server":"http://localhost:9292/faye","timestamp":1334429946136,"channel":"/messages/new","signature":"f2f893c52621018409acc634ef540971c48347f4"}

Publishing messages from the rails console

To test our SocketRocket client later on we're going to use rails console to publish messages:


PrivatePub.publish_to '/messages/new', message: 'Hello, World!'

That's all on the Rails side! Details can be reviewed on the project side @github.

iOS: SocketRocket client

I'm going to skip the basic setup because there are enough informations available at the respective project sides. I'm assuming you've got an iOS app setup using SBJson, AFNetworking and SocketRocket.

Fetching the Private Pub configuration

First, we need to retrieve the Private Pub subscription informations. This is done using AFNetworking. I've left out the part where we're creating a Private Pub client for now:


NSString *resourceUrl = [NSString stringWithFormat:@"http://localhost:3000/api/websockets/configuration.json?channel=%@", @"/messages/new"];
NSURL *url = [NSURL URLWithString:resourceUrl];
NSURLRequest *request = [NSURLRequest requestWithURL:url];

AFJSONRequestOperation *operation = [AFJSONRequestOperation JSONRequestOperationWithRequest:request
  success:^(NSURLRequest *request, NSHTTPURLResponse *response, id JSON) {
    [self initializePrivatePubClientWithSubscriptionInformation: JSON];
  }
  failure:nil
];

[operation start];

After fetching the Private Pub subscription informations I'm passing them along to the initializePrivatePubClientWithSubscriptionInformation method.

Instanciating a new Private Pub client

The initializePrivatePubClientWithSubscriptionInformation method sets up a SRWebSocket client, as well as delegate which takes care of the Private Pub connection setup. Please note that this could be wrapped differently - I'm assuming all we need is a consuming client. If this were to be used for bidirectional activity I'd have to wrap it differently.


- (void) initializePrivatePubClientWithSubscriptionInformation: (id) JSON {
  self.websocketDelegate = [[PrivatePubWebSocketDelegate alloc]
      initWithPrivatePubTimestamp: [JSON valueForKeyPath:@"timestamp"]
      andSignature: [JSON valueForKeyPath:@"signature"]
      andChannel:NEW_MESSAGES_CHANNEL];

  NSString *server = [JSON valueForKeyPath:@"server"];
  NSURL *url = [NSURL URLWithString:server];
  NSURLRequest *configurationRequest = [NSURLRequest requestWithURL:url];

  self.websocketClient = [[SRWebSocket alloc] initWithURLRequest:configurationRequest];
  self.websocketClient.delegate = self.websocketDelegate;

  [self.websocketClient open];
}

That's the basic setup. Now let's take a look under the hood to see what PrivatePubWebSocketDelegate has to do for us so we can use Private Pub.

PrivatePubWebSocketDelegate

Before we can receive or publish any messages we need to send two messages to our Private Pub server. Namely:

  • a message to /meta/handshake to introduce us.
  • a message to /meta/subscribe to subscribe to a channel we want to publish/ receive messages from.

For additional logic like connection keep-alive and the client-netiquette we also need to send

  • a message to /meta/disconnect when we exit our client
  • a message to /meta/connect to ping the server

Since we can neither receive nor publish messages before our subscription was successful I've split the setup logic up using a very simplish state machine. The state machine is implemented using isa-swizzling.

Using a state machine gives me proper separation of concern: every state handles the messages necessary and changes the state as soon as necessary.

I've split up the Private Pub client into the following states:

  • AwaitingHandshakeState which handles the introduction, including assigning a clientId and advancing us into the...
  • SubscriptionState which subscribes us to a single channel. After that we reach the
  • KeepAliveState which periodically sends a keep-alive message to the server. Typically we'd add our custom logic at this stage.

PrivatePubWebSocketDelegate & AwaitingHandshakeState

Let's take a look at the entry point for our AwaitingHandshakeState:


  @interface PrivatePubWebSocketDelegate : NSObject <SRWebSocketDelegate>
  // *snip*
  @end

  @implementation PrivatePubWebSocketDelegate
  // *snip*
  #pragma mark - SRWebSocketDelegate Protocol Methods
  // *snip*
  - (void)webSocketDidOpen:(SRWebSocket *)webSocket {
    NSLog(@"WebSocket is open");

    NSDictionary *handshake = [NSDictionary dictionaryWithObjectsAndKeys:
      @"/meta/handshake", @"channel",
      [NSNumber numberWithFloat:1.0], @"version",
      [NSArray arrayWithObjects:@"websocket", nil], @"supportedConnectionTypes",
      [NSNumber numberWithInt:self.messageId++], @"id",
      nil];

    isa = [AwaitingHandshakeState class];

    [webSocket send:[handshake JSONRepresentation]];
  }
  @end

The PrivatePubWebSocketDelegate base class is a simple SRWebSocketDelegate. It's "kind of" a state machine because it changes it's own class once the connection has been opened. After the connection has been opened we have to tell Private Pub what kind of client we are. This includes a list of supported connection types:


NSDictionary *handshake = [NSDictionary dictionaryWithObjectsAndKeys:
  @"/meta/handshake", @"channel",
  [NSNumber numberWithFloat:1.0], @"version",
  [NSArray arrayWithObjects:@"websocket", nil], @"supportedConnectionTypes",
  [NSNumber numberWithInt:self.messageId++], @"id",
  nil];
  // *snip*
  [webSocket send:[handshake JSONRepresentation]];

The response tells us if our handshake was successful or not. This logic is handled in the AwaitingHandshakeState:


@implementation AwaitingHandshakeState

- (void) webSocket:(SRWebSocket *)webSocket didReceiveMessage:(NSString *)message {
  id JSON = [[message JSONValue] objectAtIndex:0];

  NSString *channel = [JSON valueForKeyPath:@"channel"];
  if ([channel isEqualToString:@"/meta/handshake"]) {
    Boolean handshakeWasSuccessful = [[JSON valueForKeyPath:@"successful"] boolValue];

    if (handshakeWasSuccessful) {
      self.clientId = [JSON valueForKeyPath:@"clientId"];
      self.webSocket = webSocket;

      isa = [SubscriptionState class];
      [(SubscriptionState *)self sendSubscriptions];
    }
  }
}

@end

First, I'm making sure our received message is an actual response to our handshake: [channel isEqualToString:@"/meta/handshake"]. After that I'm inspecting the message if the handshake was successful: [[JSON valueForKeyPath:@"successful"] boolValue]. If this is the case the clientId is set, and I'm advancing the state machine into the SubscriptionState. Note that the clientId must be present in all messages following the handshake, and we can't choose our own. It's assigned by the server.

the SubscriptionState

The SubscriptionState takes care of sending messages to /meta/subscribe to tell Private Pub we are actually interested in a certain channel. Also it uses the passed in signature and timestamp to pass Private Pub validation. Since we have to take the initiative on this, AwaitingHandshakeState tells us to sendSubscriptions:


  @implementation SubscriptionState

  - (void) sendSubscriptions {
    NSDictionary *ext = [NSDictionary dictionaryWithObjects:[NSArray arrayWithObjects:self.privatePubTimestamp, self.privatePubSignature, nil] forKeys:[NSArray arrayWithObjects:@"private_pub_timestamp", @"private_pub_signature", nil]];

    NSDictionary *subscription = [NSDictionary dictionaryWithObjectsAndKeys:
          @"/meta/subscribe", @"channel",
          self.clientId, @"clientId",
          self.channel, @"subscription",
          [NSNumber numberWithInt:self.messageId++], @"id",
          ext, @"ext",
        nil];

    [self.webSocket send:[subscription JSONRepresentation] ];
  }

  - (void) webSocket:(SRWebSocket *)webSocket didReceiveMessage:(NSString *)message {
    id JSON = [[message JSONValue] objectAtIndex:0];
    NSLog(@"recv subscription %@", JSON);

    Boolean subscriptionWasSuccessful = [[JSON valueForKeyPath:@"successful"] boolValue];
    if (subscriptionWasSuccessful) {
      NSLog(@"Now subscribed to %@", self.channel);

      isa = [KeepAliveState class];

      [(KeepAliveState *)self setupKeepAlive];
    }
  }
  @end

The sendSubscriptions method takes care of publishing our subscription request to /meta/subscribe, while our new websocket:didReceiveMessage: method checks if the subscription was actually successful. Only if the subscription was successfully established we're going to enter our last state: the KeepAliveState.

keeping the connection alive: KeepAliveState

The setupKeepAlive method detaches a new thread which sends a single keep-alive message. I'm assuming that we want to keep the connection open forever. In this case the sendKeepAlive method detaches itself to send another keep-alive message.


  @implementation KeepAliveState

  - (void) setupKeepAlive {
    // *snip*
    [NSThread detachNewThreadSelector:@selector(sendKeepAlive) toTarget:self withObject:nil];
    // *snip*
  }

  - (void) sendKeepAlive {
      sleep(self.timeout);

      NSDictionary *keepAlive = [NSDictionary dictionaryWithObjectsAndKeys:
        self.clientId, @"clientId",
        @"/meta/connect", @"channel",
        @"websocket", @"connectionType",
        [NSNumber numberWithInt:self.messageId++], @"id",
        nil];
      [self.webSocket send: [keepAlive JSONRepresentation]];

      [NSThread detachNewThreadSelector:@selector(sendKeepAlive) toTarget:self withObject:nil];
    }
  @end

Note that I did not implement a proper NSThread handling here. It's probably a very bad example - just for demonstration purposes.

Anyway, that's about it. Lets wrap this up:

  • Our state machine using inheritance with PrivatePubWebSocketDelegate as superclass
  • every state handles different messages, so every subclass has its own webSocket:didReceiveMessage: method which handles only specific messages
  • the AwaitingHandshakeState is reached first. If the handshake was successful it assigns a clientId
  • the SubscriptionState is reached next. It publishes a subscription request with the Private Pub authentication informations and checks if the request was successful.
  • the KeepAliveState is reached last. It uses NSThread to publish a keep-alive message every timeout seconds. This is were we'd want to add our custom Private Pub client-server logic.

I guess this could be properly re-factored and released as a separate library, since I have not found any resources on how to write a SocketRocket - Private Pub client so far. Feel free to contact me if you are having troubles reproducing this. Also note that you can always try out the sample projects @github.

Happy hacking!

Links


Replacing pow.cx

written by Raphael

EDIT There is an updated version available which does not setup nginx as a system service. Instead HAProxy is used to allow a proper WebSocket setup. If you like to follow along just setup named properly.


Pow is a nice Rack server. You can't work with it while offline but that didn't bother me much. Then it started randomly crashing on me & using 99% of my CPU. That's when I started testing alternative setups to get rid of pow.

A basic, alternative setup should

  • closely resemble my production setup
  • better serve the needs of a web-app with multiple dependencies, e.g. resque, faye
  • gracefully handle errors
  • work well with co-workers still using pow

Since I don't like to change my /etc/hosts file I need some sort of dynamic DNS resolution. Also using unicorn & nginx seemed like a good choice because I'm always using them in production.

Here's the basic setup I'm using right now:

  • use bind for DNS resolution
  • nginx as reverse proxy
  • foreman to startup all necessary services

Instead of bind, dnsmasq should work as well.

To setup bind & nginx I followed this guide, with some small changes. You should read it now, since I'm not going to repeat the steps listed in the guide. I'm only going to show changes in configuration.

Also note that all I did was install nginx & bind. Nothing more.

changes to bind configuration

I like .dev as my development TLD, so I changed the configuration to use it instead of ld.


# /etc/named.conf
zone "dev" IN {
       type master;
       file "dev.zone";
};

# /var/named/dev.zone
dev. 7200    IN       SOA     dev. root.dev. (
             2008031801 ;    Serial
             15      ; Refresh every 15 minutes
             3600    ; Retry every hour
             3000000 ; Expire after a month+
             86400 ) ; Minimum ttl of 1 day
             IN      NS      dev.
             IN      MX      10 dev.

             IN      A       127.0.0.1
*.dev.        IN      A       127.0.0.1

# /etc/resolver/dev
nameserver 127.0.0.1

changes to nginx homebrew configuration

After removing plenty of stuff, this is my nginx.conf:


# /usr/local/etc/nginx/nginx.conf
worker_processes  1;

error_log  /usr/local/Cellar/nginx/logs/error.log debug;

#pid        logs/nginx.pid;


events {
  worker_connections  128;
}

http {
    include       mime.types;
    access_log  /usr/local/Cellar/nginx/logs/access.log combined;

    sendfile        on;
    keepalive_timeout  65;

    include /usr/local/etc/nginx/sites-enabled/*;
}

Now whenever I deploy a new site for local development I place my nginx configuration at /usr/local/etc/nginx/sites-enabled

changes to my workflow

Before coding on a project I cd into the application directory and run foreman start to get my application and all of it's dependencies up and running.

Only after I've done that I can start working. As soon as I'm done I stop foreman. That's it.

This setup requires more work than I had running pow, but moving from development to production is much easier now since most of the configuration is already done on my local machine. To me this seems to be a good replacement for pow. At least for now.


Migrating Monolith Rails 2.x Apps to Rails 3.x

written by Raphael

This is a follow up on my post made on 30th October 2011, Migrating Monolith Rails 2.x Apps. I'm going to walk through the basic steps necessary to make rubycas work with existing Devise authentication data, more specific Devise v1.0.11.

To enable rubycas-server to work with your existing user data we are going to create a custom authenticator:


# encoding: UTF-8
require 'casserver/authenticators/sql'

require 'devise/encryptors/base'
require 'devise/encryptors/sha1'

class CustomAuthenticator < CASServer::Authenticators::SQL
  # snip from devise lib
  def secure_compare(a, b)
    return false unless a.present? && b.present?
    return false unless a.bytesize == b.bytesize
    l = a.unpack "C#{a.bytesize}"

    res = 0
    b.each_byte { |byte| res |= byte ^ l.shift }
    res == 0
  end

  # copied from devise.rb initializer
  DEVISE_STRETCHES = 7
  DEVISE_PEPPER = 'my-devise-pepper'

  def password_digest(password, password_salt)
    Devise::Encryptors::Sha1.digest(password, DEVISE_STRETCHES, password_salt, DEVISE_PEPPER)
  end

  def valid_for_authentication?(user, incoming_password)
    secure_compare(password_digest(incoming_password,user.password_salt), user.encrypted_password)
  end

  def validate(credentials)
    read_standard_credentials(credentials)
    raise_if_not_configured

    user_model = self.class.user_model

    username_column = @options[:username_column] || "username"

    $LOG.debug "#{self.class}: [#{user_model}] " + "Connection pool size: #{user_model.connection_pool.instance_variable_get(:@checked_out).length}/#{user_model.connection_pool.instance_variable_get(:@connections).length}"
    results = user_model.find(:all, :conditions => ["#{username_column} = ?", @username])
    user_model.connection_pool.checkin(user_model.connection)

    if results.size > 0
      $LOG.warn("Multiple matches found for user '#{@username}'") if results.size > 1
      user = results.first
      unless @options[:extra_attributes].blank?
        if results.size > 1
          $LOG.warn("#{self.class}: Unable to extract extra_attributes because multiple matches were found for #{@username.inspect}")
        else
          extract_extra(user)
              log_extra
        end
      end
      return valid_for_authentication? user, @password
    else
      return false
    end
  end
end

All I did above is to extract the valid_for_authentication? method from Devise 1.0.11 into a rubycas authentication class.

Ideally we should keep the pepper and stretches configuration in a separate file. Requiring your devise.rb initializer however does not work for Devise 1.0.11 because it assumes you are running a Ruby on Rails application which is not the case here. I'd probably put it into a Yaml file and load it on demand - the implementation for that is simple so I skipped it here.

Please note that the above example assumes you configured Devise to use the SHA1 hashing algorithm. An example snippet from your devise.rb initializer could look like this:


  Devise.setup do |config|
  # snip
  config.pepper = "my-devise-pepper"
  config.stretches = 7
  config.encryptor = :sha1
  # snip
end

Now all you need to do to use your custom authenticator is to update your config.yml:


authenticator:
  class: CustomAuthenticator
  database:
    adapter: mysql
    database: rails_app_database
    username: username
    password: password
    host: localhost
  user_table: users
  username_column: email

and to restart your rubycas server. You should now be able to log-in using your existing user data.

Note

The changes necessary to support more recent versions of Devise should be similar. I have not checked yet, but might do so in the future. I might update this post with another example for Devise 2.0.

Next Up:

Redirect all log-in requests on the old Rails App to use our newly setup rubycas-server