RedisProtocolTransformer class
The RedisProtocolTransformer transforms a redis stream into RedisReply objects. For a documentation on the redis protocol, please view the redis protocol documentation.
class RedisProtocolTransformer extends StreamEventTransformer<List<int>, RedisReply> {
/// Charcode for status replies
static const int PLUS = 43;
/// Charcode for error replies
static const int DASH = 45;
/// Charcode for integer replies
static const int COLON = 58;
/// Charcode for bulk replies
static const int DOLLAR = 36;
/// Charcode for multi bulk replies
static const int ASTERIX = 42;
/**
* If the transformer has alrady received data, this will hold the [RedisReply].
*/
RedisReply _currentReply;
/// Converts a list of char codes to a String
String _charCodesToString(List<int> bytes) => new String.fromCharCodes(bytes);
/// Converts a char code to a String
String _charCodeToString(int byte) => new String.fromCharCode(byte);
/**
* Actually handles the incoming data and adds [RedisReply] objects to the
* sink when they're ready.
*/
void handleData(List<int> data, EventSink<RedisReply> output) {
// I'm not entirely sure this is necessary, but better be safe.
if (data.length == 0) return;
if (_currentReply == null) {
// This is a fresh RedisReply. How exciting!
try {
_currentReply = new RedisReply.fromType(data.first);
}
on RedisProtocolTransformerException catch (e) {
this.handleError(e, output);
}
}
List<int> unconsumedData = _currentReply._consumeData(data);
// Make sure that unconsumedData can't be returned unless the reply is actually done.
assert(unconsumedData == null || _currentReply.done);
if (_currentReply.done) {
// Reply is done!
output.add(_currentReply);
_currentReply = null;
if (unconsumedData != null && !unconsumedData.isEmpty) {
handleData(unconsumedData, output);
}
}
}
/**
* Closes the [EventSink] and adds an error before if there was some
* incomplete data
*/
void handleDone(EventSink<RedisReply> output) {
if (_currentReply != null) {
// Apparently some data has already been sent, but the stream is done.
this.handleError(new UnexpectedRedisClosureError("Some data has already been sent but was not complete."), output);
}
output.close();
}
}
Extends
StreamEventTransformer<List<int>, RedisReply> > RedisProtocolTransformer
Static Properties
const int ASTERIX #
Charcode for multi bulk replies
static const int ASTERIX = 42
const int COLON #
Charcode for integer replies
static const int COLON = 58
const int DASH #
Charcode for error replies
static const int DASH = 45
const int DOLLAR #
Charcode for bulk replies
static const int DOLLAR = 36
const int PLUS #
Charcode for status replies
static const int PLUS = 43
Methods
Stream<T> bind(Stream<S> source) #
inherited from StreamEventTransformer
Stream<T> bind(Stream<S> source) {
return new EventTransformStream<S, T>(source, this);
}
void handleData(List<int> data, EventSink<RedisReply> output) #
Actually handles the incoming data and adds RedisReply objects to the sink when they're ready.
void handleData(List<int> data, EventSink<RedisReply> output) {
// I'm not entirely sure this is necessary, but better be safe.
if (data.length == 0) return;
if (_currentReply == null) {
// This is a fresh RedisReply. How exciting!
try {
_currentReply = new RedisReply.fromType(data.first);
}
on RedisProtocolTransformerException catch (e) {
this.handleError(e, output);
}
}
List<int> unconsumedData = _currentReply._consumeData(data);
// Make sure that unconsumedData can't be returned unless the reply is actually done.
assert(unconsumedData == null || _currentReply.done);
if (_currentReply.done) {
// Reply is done!
output.add(_currentReply);
_currentReply = null;
if (unconsumedData != null && !unconsumedData.isEmpty) {
handleData(unconsumedData, output);
}
}
}
void handleDone(EventSink<RedisReply> output) #
Closes the EventSink and adds an error before if there was some
incomplete data
void handleDone(EventSink<RedisReply> output) {
if (_currentReply != null) {
// Apparently some data has already been sent, but the stream is done.
this.handleError(new UnexpectedRedisClosureError("Some data has already been sent but was not complete."), output);
}
output.close();
}