Storm Spout Co-ordinator – llegal State Exception – Deactivate / Activate Topology

Recently while working with Transactional Topologies storm 0.9.4 we came across this error in the spout

java.lang.IllegalStateException: 
Expecting previous txid state to be the previous transaction

We were a little confused as to why this happened, as the code change was minimal.

Root Cause Analysis:

Normally we make method ‘isReady()’ of Coordinator return true. We made changes for it to return false, i.e we wanted to

implement deactivation/activation of the topology instead of killing it cold i.e. a graceful stop.

In addition to that, we had for our topology

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);

i.e. the number of parallel in flight transactions was set to 10.

What happens when ‘isReady()’ returns false ?

As per Coordinator Java doc,  the next transaction id’s  are skipped,

i.e. will not be used. refer TransactionalSpoutCoordinator.java

if(_activeTx.size() < _maxTransactionActive) {
    BigInteger curr = _currTransaction;
    for(int i=0; i<_maxTransactionActive; i++) {
        if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
                && !_activeTx.containsKey(curr)) {
          ....
          Object state = _coordinatorState.getState(curr, _initializer);
          ....
        }
        curr = nextTransactionId(curr);   //      <------- txn++ ----------------
    }
}

What happens when ‘isReady()’ returns true .i.e. we activate topology again ?

The same logic from above runs i.e.

Object state = _coordinatorState.getState(curr, _initializer);

because we have below

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10); 
// hence it enters the for loop above

Now if we go into the getState() method

if(_strictOrder) {
    .....
    if(prev!=null && !prev.equals(txid.subtract(BigInteger.ONE))) {
        throw new IllegalStateException("Expecting previous txid state to be the previous transaction");
    }
    .....                
}

Lets say we deactivate the topology, making isReady() return FALSE.

Assuming the previous committed txn is 3 and current is say 6 after some skipping.

From the above, the 6 – 1 is not equal to 3. i.e. STRICT order needs to be maintained.

Storm expects the next txn to be 4 as 4 – 1 == 3.  but do to skipping, this is violated.

Hence the exception as we have set

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);

if this is set to 1, this will not happen as it will never enter the for loop.

I guess this is a bug.

Work Around:

Deactivate your topology by making isReturn() return False, wait for data processed to reach Zero.

and then kill your topology.

or set

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 01);

Hope this helps.

Advertisements