Dart Documentationredis_protocol_transformerRedisProtocolTransformer

RedisProtocolTransformer class

The RedisProtocolTransformer transforms a redis stream into RedisReply objects. For a documentation on the redis protocol, please view the redis protocol documentation.

class RedisProtocolTransformer extends StreamEventTransformer<List<int>, RedisReply> {

 /// Charcode for status replies
 static const int PLUS = 43;

 /// Charcode for error replies
 static const int DASH = 45;

 /// Charcode for integer replies
 static const int COLON = 58;

 /// Charcode for bulk replies
 static const int DOLLAR = 36;

 /// Charcode for multi bulk replies
 static const int ASTERIX = 42;

 /**
  * If the transformer has alrady received data, this will hold the [RedisReply].
  */
 RedisReply _currentReply;

 /// Converts a list of char codes to a String
 String _charCodesToString(List<int> bytes) => new String.fromCharCodes(bytes);

 /// Converts a char code to a String
 String _charCodeToString(int byte) => new String.fromCharCode(byte);

 /**
  * Actually handles the incoming data and adds [RedisReply] objects to the
  * sink when they're ready.
  */
 void handleData(List<int> data, EventSink<RedisReply> output) {
   // I'm not entirely sure this is necessary, but better be safe.
   if (data.length == 0) return;

   if (_currentReply == null) {
     // This is a fresh RedisReply. How exciting!

     try {
       _currentReply = new RedisReply.fromType(data.first);
     }
     on RedisProtocolTransformerException catch (e) {
       this.handleError(e, output);
     }
   }

   List<int> unconsumedData = _currentReply._consumeData(data);

   // Make sure that unconsumedData can't be returned unless the reply is actually done.
   assert(unconsumedData == null || _currentReply.done);

   if (_currentReply.done) {
     // Reply is done!
     output.add(_currentReply);
     _currentReply = null;
     if (unconsumedData != null && !unconsumedData.isEmpty) {
       handleData(unconsumedData, output);
     }
   }
 }

 /**
  * Closes the [EventSink] and adds an error before if there was some
  * incomplete data
  */
 void handleDone(EventSink<RedisReply> output) {

   if (_currentReply != null) {
     // Apparently some data has already been sent, but the stream is done.
     this.handleError(new UnexpectedRedisClosureError("Some data has already been sent but was not complete."), output);
   }

   output.close();
 }
}

Extends

StreamEventTransformer<List<int>, RedisReply> > RedisProtocolTransformer

Static Properties

const int ASTERIX #

Charcode for multi bulk replies

static const int ASTERIX = 42

const int COLON #

Charcode for integer replies

static const int COLON = 58

const int DASH #

Charcode for error replies

static const int DASH = 45

const int DOLLAR #

Charcode for bulk replies

static const int DOLLAR = 36

const int PLUS #

Charcode for status replies

static const int PLUS = 43

Methods

Stream<T> bind(Stream<S> source) #

inherited from StreamEventTransformer
Stream<T> bind(Stream<S> source) {
 return new EventTransformStream<S, T>(source, this);
}

void handleData(List<int> data, EventSink<RedisReply> output) #

Actually handles the incoming data and adds RedisReply objects to the sink when they're ready.

void handleData(List<int> data, EventSink<RedisReply> output) {
 // I'm not entirely sure this is necessary, but better be safe.
 if (data.length == 0) return;

 if (_currentReply == null) {
   // This is a fresh RedisReply. How exciting!

   try {
     _currentReply = new RedisReply.fromType(data.first);
   }
   on RedisProtocolTransformerException catch (e) {
     this.handleError(e, output);
   }
 }

 List<int> unconsumedData = _currentReply._consumeData(data);

 // Make sure that unconsumedData can't be returned unless the reply is actually done.
 assert(unconsumedData == null || _currentReply.done);

 if (_currentReply.done) {
   // Reply is done!
   output.add(_currentReply);
   _currentReply = null;
   if (unconsumedData != null && !unconsumedData.isEmpty) {
     handleData(unconsumedData, output);
   }
 }
}

void handleDone(EventSink<RedisReply> output) #

Closes the EventSink and adds an error before if there was some incomplete data

void handleDone(EventSink<RedisReply> output) {

 if (_currentReply != null) {
   // Apparently some data has already been sent, but the stream is done.
   this.handleError(new UnexpectedRedisClosureError("Some data has already been sent but was not complete."), output);
 }

 output.close();
}

void handleError(error, EventSink<T> sink) #

inherited from StreamEventTransformer

Act on incoming error event.

The method may generate any number of events on the sink, but should not throw.

void handleError(error, EventSink<T> sink) {
 sink.addError(error);
}