Category Archives: Java

MOM and JMS primer (ActiveMQ example)

I’ve been reading recently a very good book on Message-oriented middleware (MOM) broker server Apache ActiveMQ which is an open source implementation of the Java Message Service (JMS) spec. and makes for a reliable hub in any message-oriented enterprise application and integrates beautifully with Java EE containers, ESBs, and other JMS providers.

The book “ActiveMQ in Action” (authored by Bruce Snyder, Dejan Bosanac, and Rob Davies) starts from the anatomy of a JMS message and moves quickly through connectors, message persistence, authentication, and authorization.

Please find below few notes i made while reading:

 

Standard Message Headers:

JMSDestination

  • Header Field: JMSDestination
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: where to send the message
  • Example: queue://mycompany.1.0.notifications.rs.queue

JMSDeliveryMode

  • Header Field: JMSDeliveryMode
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: persistent or non-persistent*
  • Example: Persistent

JMSExpiration

  • Header Field: JMSExpiration
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: when should the message expire (in ms)
  • Example: 0 (never)

JMSPriority

  • Header Field: JMSPriority
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: priority 0-9 (0-4 normal, 5-9 expedited)
  • Example: 4 (standard)

JMSMessageID

  • Header Field: JMSDestination
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: identifier of the message
  • Example: ID:POL-MPRZYDATEK-63606-1370265295842-1:2:1:1:1

JMSTimestamp

  • Header Field: JMSTimestamp
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: when the event occurred
  • Example: 2013-06-03 15:15:27:196 CEST

JMSCorrelationID

  • Header Field: JMSCorrelationID
  • Set By: JMS Client
  • Set mode: manual
  • Meaning: link one message with another (eg. conversation-style msg. ex.)

JMSReplyTo

  • Header Field: JMSReplyTo
  • Set By: JMS Client
  • Set mode: manual
  • Meaning: where (destination) is the reply expected

JMSType

  • Header Field: JMSType
  • Set By: JMS Client
  • Set mode: manual
  • Meaning: used by few vendors to semantically identify the message type

JMSRedelivered

  • Header Field: JMSRedelivered
  • Set By: JMS provider
  • Set mode: auto
  • Meaning: if set to true, it is likely that the msg was delivered but not acknowledged
  • Example: false

* Persistent messages are intended to survive system failures of the JMS provider (the message server). Persistent messages are written to disk as soon as the message server receives them from the JMS client. After the message is persisted to disk the message server can then attempt to deliver the message to its intended consumer. As the messaging server delivers the message to the consumers it keeps track of which consumers successfully receive the message. If the JMS provider fails while delivering the message, the message server will pick up where it left off following a recovery. Persistent messages are delivered once-and-only-once. Persistent messages incur
more overhead due to the need to store the message, and value reliability over performance.
* Nonpersistent messages are not written to disk when they are received by the message server, so if the JMS provider fails, the message will be lost. In general nonpersistent messages perform better than persistent messages. They are delivered more quickly and require less system resources on the message server. However, nonpersistent messages should only be used when a loss of messages due to a JMS provider failures is not an issue.

 

Request/reply messaging pattern

  • is an asynchronous back-and-forth conversational pattern utilizing either the PTP domain or the pub/sub domain through a combination of the JMSReplyTo and JMSCorrelationID message headers and temporary destinations.
  • The JMSReplyTo specifies the destination where a reply should be sent, and the JMSCorrelationID in the reply message specifies the JMSMessageID of the request message.
  • These headers are used to link the reply message(s) to the original request message.
  • Temporary destinations are those that are created only for the duration of a connection and can only be consumed from by the connection that created them.
  • These restrictions make temporary destinations useful for request/reply.

 

Distinguishing message durability from message persistence

  • Message durability can only be achieved with the pub/sub domain. When clients connect to a topic, they can do so using a durable or a nondurable subscription
    • Durable subscription—A durable subscription is infinite. It’s registered with the topic subscription to tell the JMS provider to preserve the subscription state in the event that the subscriber disconnects. If a durable subscriber disconnects the JMS provider will hold all messages until that subscriber connects again or until the subscriber explicitly unsubscribes from the topic.
    • Nondurable subscription—A nondurable subscription is finite. It’s registered with the topic subscription to tell the JMS provider to not preserve the subscription state in the event that the subscriber disconnects. If a subscriber disconnects, the JMS provider won’t hold any messages during the disconnection period
    • Message persistence is independent of the message domain. Message persistence is a quality of service property used to indicate the JMS application’s ability to handle missing messages in the event of a JMS provider failure

 

Four types of message persistence:

  • KahaDB
    • recommended message store for general-purpose messages since ActiveMQ version 5.3 is KahaDB.
    • a file-based message store that combines a transactional journal, for reliable message storage and recovery, with good performance and scalability
    • the structure of the KahaDB store has been streamlined especially for the requirements of a message broker.
    • The KahaDB message store uses a transactional log for its indexes and only uses one index file for all its destinations.
    • It’s been used in production environments with 10,000 active connections, each connection having a separate queue
  • AMQ
    • like KahaDB, is a combination of a transactional journal for reliable persistence (to survive system crashes) and high-performance indexes, which makes this store the best option when message throughput is the main requirement for an application.
    • But because it uses two separate files for every index, and there’s an index per destination, the AMQ message store shouldn’t be used if you intend to use thousands of queues per broker.
    • Also, recovery can be slow if the ActiveMQ broker isn’t shut down cleanly. This is because all the indexes need to be rebuilt, which requires the broker to traverse all its data logs to accurately build the indexes again.
  • JDBC
    • JDBC persistence is definitely not superior in performance to KahaDB nor AMQ
    • the use of a shared database is particularly useful for making a redundant master/slave topology out of multiple brokers. When a group of ActiveMQ brokers is configured to use a shared database, they’ll all try to connect and grab a lock in the lock table, but only one will succeed and become the master. The remaining brokers will be slaves, and will be in a wait state, not accepting client connections until the master fails.
  • memory
    • memory message store holds all persistent messages in memory. No active caching is involved, so you have to be careful that both the JVM and the memory limits you set for the broker are large enough to accommodate all the messages that may exist in this message store at one time.
    • The memory message store can be useful if you know that the broker will only store a finite amount of messages, which will typically be consumed quickly

 

Caching messages in the broker for consumers

  • Although one of the most important aspects of message persistence is that the messages will survive in long-term storage, there are a number of cases where messages are required to be available for consumers that were disconnected from the broker, but persisting the messages in a data store is too slow
  • ActiveMQ supports the caching of messages for these types of systems using message caching in the broker by using something called a subscription recovery policy. This configurable policy is used for deciding which types of messages should be cached, how many, and for how long
  • subscription recovery policies:
    • fixed (memory) size subscription recovery policy
    • fixed count (messages) subscription recovery policy
    • query-based subscription recovery policy – Caches only messages that match the query
    • timed subscription recovery policy (The time in milliseconds to keep messages in the cache
    • LAST IMAGE SUBSCRIPTION RECOVERY POLICY (only last message)
    • NO SUBSCRIPTION RECOVERY POLICY (disables message caching for topics)

 

Authentication

  • Simple authentication plug-in—Handles credentials directly in the XML configuration file or in a properties file
  • JAAS authentication plug-in—Implements the JAAS API and provides a more powerful and customizable authentication solution (JAAS provides pluggable authentication, which means ActiveMQ will use the same authentication API regardless of the technique used to verify user credentials (a text file, a relational database, LDAP, and so on)
  • custom plug-in for handling security (BrokerFilter implementation)

 

Authorization

  • operation-level
    • Read—The ability to receive messages from the destination
    • Write—The ability to send messages to the destination
    • Admin—The ability to administer the destination
  • message-level
    • remote address
    • message headers

 

Hope you find this short summary useful.

 

Disabling SSL Certificate Validation

Recently i came across two SSL-related exceptions when writing a testing Spring-based client application using RestTemplate. Because both of them are related to an untrusted connection when making HTTPS calls (as a result of self-signed SSL certificate configured in Tomcat), and both have one common solution, i thought i’ll share it with You.

 

Exceptions thrown:

  • java.security.cert.CertificateException: No name matching my.company.com found; nested exception is javax.net.ssl.SSLHandshakeException
  • sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

Solution:

  • disabling SSL certificate validation (unless you want to go the “hard way” of installing a correct/SSL Authority-signed certificate)
  • This is how i did it in Java:

 

import javax.net.ssl.*;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

public class SSLCertificateValidation {

    public static void disable() {
        try {
            SSLContext sslc = SSLContext.getInstance("TLS");
            TrustManager[] trustManagerArray = { new NullX509TrustManager() };
            sslc.init(null, trustManagerArray, null);
            HttpsURLConnection.setDefaultSSLSocketFactory(sslc.getSocketFactory());
            HttpsURLConnection.setDefaultHostnameVerifier(new NullHostnameVerifier());
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    private static class NullX509TrustManager implements X509TrustManager {
        public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            System.out.println();
        }
        public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            System.out.println();
        }
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    private static class NullHostnameVerifier implements HostnameVerifier {
        public boolean verify(String hostname, SSLSession session) {
            return true;
        }
    }

}

The complete example is available as a Gist.

 

What you now have to do, is to invoke SSLCertificateValidation.disable() prior to making a HTTPS call using RestClient. Simple as that.

Producing JWT tokens

To produce the JWT token i’ll be using the Nimbus JOSE+JWT Java library, which implements the Javascript Object Signing and Encryption (JOSE) suite of specifications as well as the closely related JSON Web Token (JWT) specification.

Technologies used:

  • Apache Maven 3.0.5
  • Nimbus JOSE+JWT 2.16
  • Java 7

 

First, let’s add maven dependency for Nimbus JOSE+JWT lib:

<dependency>
    <groupId>com.nimbusds</groupId>
    <artifactId>nimbus-jose-jwt</artifactId>
    <version>2.16</version>
</dependency>

 

…and start composing the JWT reserved claims right away:

JWTClaimsSet jwtClaims = new JWTClaimsSet();
jwtClaims.setIssuer("https://my-auth-server.com");
jwtClaims.setSubject("Mariusz");
List aud = new ArrayList<>();
aud.add("https://my-web-app.com");
aud.add("https://your-web-app.com");
jwtClaims.setAudience(aud);
jwtClaims.setExpirationTime(new Date(new Date().getTime() + 1000*60*10));
jwtClaims.setNotBeforeTime(new Date());
jwtClaims.setIssueTime(new Date());
jwtClaims.setJWTID(UUID.randomUUID().toString());

as you can see, we’re setting up all of the Reserved Claim Names mentioned in my earlier post on JWT (ie. Issuer, Subject, Audience, Expiration Time (to 10 minutes), Not Before Time, Issued At Time and the JWT ID) and using random UUID as the identifier of the token.

 

When printed out the above, you should see something similar to this:

{
    "exp":1373625160,
    "sub":"Mariusz";,
    "nbf":1373624561,
    "aud":[
        "https:\/\/my-web-app.com",
        "https:\/\/your-web-app.com"
    ],
    "iss":"https:\/\/my-auth-server.com";,
    "jti":"c79772ea-8777-44dc-a0fe-9001aeee9d02",
    "iat":1373624561
}

 

now, let’s create the JWT header and specify RSA-OAEP as the encryption algorithm and 128-bit AES/GCM as the encryption method that will be used to protect the JWT token:

JWEHeader header = new JWEHeader(
    JWEAlgorithm.RSA_OAEP,
    EncryptionMethod.A128GCM
);

 

next create the EncryptedJWT object that will be later used to perform the RSA encryption:

EncryptedJWT jwt = new EncryptedJWT(header, jwtClaims);

 

…create an RSA encrypter with the specified public RSA key:

RSAEncrypter encrypter = new RSAEncrypter(publicRsaKey);

(for details on how to generate RSA keys, please read my post on “RSA Keys Generation”)

 

and do the actual encryption:

jwt.encrypt(encrypter);

 

finally, we can serialize to JWT compact form in order to print it out to the screen nicely:

String jwtString = jwt.serialize();

 

what you should see after performing the steps above, is something similar to this:

eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkExMjhHQ00ifQ.ZyVpnDsmei
R_krnjTvMkB7a-DJrgdvwzXsMImIUS6x7B3yLEH5igpfiGfMD79SqYW5P
Fd7PrXVvNhq4Gs0YSg8qPPlPCjmaW7OnR9Oi891PRL1PyF0HqGzJJacZI
uu_jbeY0MQ9Z3hzNcuivhak60YFWLlGQWA7l4e7tkX4Hs4.fKF7TxNXc_
TmDl_P.AEPF230Ib8AeioJ6A4Kg0YXbYjaO4O4LVBu6qHQN1BP7ri_jc5
uCcIe02oFNEXoJZnSlaZP84LOZcnloNX6JBrLQnDr90jxkeDcoyiiLoxC
nebYJIksqyvxsGOvsAMS7MUt1Ms3Ua7tBv5pft0YVvIY9CK0oPdEyCAiu
vBp6KOR4Y9xkTy1xev5SUcWQjmskUlqtLnsO7mXpsMI09xTq13FgM2fTS
C5MIXFx2un8n8esh_rFMIfTlqLky1oa7dvb28ICjbYZPEq4CpCOeeMcQC
KliSy6A.zUGNd9GHWAY0m_m7xrQOOg

 

Now, if you’d like to read the data back from the token using your private RSA key, you’d have to do the following:

parse the above JWT string using EncryptedJWT object:

EncryptedJWT jwt = EncryptedJWT.parse(jwtString);

 

create a decrypter with the specified private RSA key:

RSADecrypter decrypter = new RSADecrypter(privateRsaKey);

 

do the decryption:

jwt.decrypt(decrypter);

 

and print out the claims:

System.out.println("iss: " + jwt.getJWTClaimsSet().getIssuer());
System.out.println("sub: " + jwt.getJWTClaimsSet().getSubject());
System.out.println("aud: " + jwt.getJWTClaimsSet().getAudience().size());
System.out.println("exp: " + jwt.getJWTClaimsSet().getExpirationTime());
System.out.println("nbf: " + jwt.getJWTClaimsSet().getNotBeforeTime());
System.out.println("iat: " + jwt.getJWTClaimsSet().getIssueTime());
System.out.println("jti: " + jwt.getJWTClaimsSet().getJWTID());

 

resulting with the following output:

iss: https://my-auth-server.com
sub: Mariusz
aud: 2
exp: Fri Jul 12 12:32:40 CEST 2013
nbf: Fri Jul 12 12:22:41 CEST 2013
iat: Fri Jul 12 12:22:41 CEST 2013
jti: c79772ea-8777-44dc-a0fe-9001aeee9d02

 

 

If you’re interested in a complete source code of this example, please clone the following Gist available on my GitHub account.

 

 

 

Sources: