Commit 4a23242f authored by Kamil Kisiel's avatar Kamil Kisiel

Merge pull request #1 from dgryski/read-graphite-carbon

Add support for enough opcodes to read Graphite's Rendering stream
parents b9dde364 add8993d
......@@ -6,75 +6,82 @@ package ogórek
import (
// Opcodes
const (
opMark = "(" // push special markobject on stack
opStop = "." // every pickle ends with STOP
opPop = "0" // discard topmost stack item
opPopMark = "1" // discard stack top through topmost markobject
opDup = "2" // duplicate top stack item
opFloat = "F" // push float object; decimal string argument
opInt = "I" // push integer or bool; decimal string argument
opBinint = "J" // push four-byte signed int
opBinint1 = "K" // push 1-byte unsigned int
opLong = "L" // push long; decimal string argument
opBinint2 = "M" // push 2-byte unsigned int
opNone = "N" // push None
opPersid = "P" // push persistent object; id is taken from string arg
opBinpersid = "Q" // " " " ; " " " " stack
opReduce = "R" // apply callable to argtuple, both on stack
opString = "S" // push string; NL-terminated string argument
opBinstring = "T" // push string; counted binary string argument
opShortBinstring = "U" // " " ; " " " " < 256 bytes
opUnicode = "V" // push Unicode string; raw-unicode-escaped"d argument
opBinunicode = "X" // " " " ; counted UTF-8 string argument
opAppend = "a" // append stack top to list below it
opBuild = "b" // call __setstate__ or __dict__.update()
opGlobal = "c" // push self.find_class(modname, name); 2 string args
opDict = "d" // build a dict from stack items
opEmptyDict = "}" // push empty dict
opAppends = "e" // extend list on stack by topmost stack slice
opGet = "g" // push item from memo on stack; index is string arg
opBinget = "h" // " " " " " " ; " " 1-byte arg
opInst = "i" // build & push class instance
opLongBinget = "j" // push item from memo on stack; index is 4-byte arg
opList = "l" // build list from topmost stack items
opEmptyList = "]" // push empty list
opObj = "o" // build & push class instance
opPut = "p" // store stack top in memo; index is string arg
opBinput = "q" // " " " " " ; " " 1-byte arg
opLongBinput = "r" // " " " " " ; " " 4-byte arg
opSetitem = "s" // add key+value pair to dict
opTuple = "t" // build tuple from topmost stack items
opEmptyTuple = ")" // push empty tuple
opSetitems = "u" // modify dict by adding topmost key+value pairs
opBinfloat = "G" // push float; arg is 8-byte float encoding
opMark byte = '(' // push special markobject on stack
opStop = '.' // every pickle ends with STOP
opPop = '0' // discard topmost stack item
opPopMark = '1' // discard stack top through topmost markobject
opDup = '2' // duplicate top stack item
opFloat = 'F' // push float object; decimal string argument
opInt = 'I' // push integer or bool; decimal string argument
opBinint = 'J' // push four-byte signed int
opBinint1 = 'K' // push 1-byte unsigned int
opLong = 'L' // push long; decimal string argument
opBinint2 = 'M' // push 2-byte unsigned int
opNone = 'N' // push None
opPersid = 'P' // push persistent object; id is taken from string arg
opBinpersid = 'Q' // " " " ; " " " " stack
opReduce = 'R' // apply callable to argtuple, both on stack
opString = 'S' // push string; NL-terminated string argument
opBinstring = 'T' // push string; counted binary string argument
opShortBinstring = 'U' // " " ; " " " " < 256 bytes
opUnicode = 'V' // push Unicode string; raw-unicode-escaped"d argument
opBinunicode = 'X' // " " " ; counted UTF-8 string argument
opAppend = 'a' // append stack top to list below it
opBuild = 'b' // call __setstate__ or __dict__.update()
opGlobal = 'c' // push self.find_class(modname, name); 2 string args
opDict = 'd' // build a dict from stack items
opEmptyDict = '}' // push empty dict
opAppends = 'e' // extend list on stack by topmost stack slice
opGet = 'g' // push item from memo on stack; index is string arg
opBinget = 'h' // " " " " " " ; " " 1-byte arg
opInst = 'i' // build & push class instance
opLongBinget = 'j' // push item from memo on stack; index is 4-byte arg
opList = 'l' // build list from topmost stack items
opEmptyList = ']' // push empty list
opObj = 'o' // build & push class instance
opPut = 'p' // store stack top in memo; index is string arg
opBinput = 'q' // " " " " " ; " " 1-byte arg
opLongBinput = 'r' // " " " " " ; " " 4-byte arg
opSetitem = 's' // add key+value pair to dict
opTuple = 't' // build tuple from topmost stack items
opEmptyTuple = ')' // push empty tuple
opSetitems = 'u' // modify dict by adding topmost key+value pairs
opBinfloat = 'G' // push float; arg is 8-byte float encoding
opTrue = "I01\n" // not an opcode; see INT docs in
opFalse = "I00\n" // not an opcode; see INT docs in
// Protocol 2
opProto = "\x80" // identify pickle protocol
opNewobj = "\x81" // build object by applying cls.__new__ to argtuple
opExt1 = "\x82" // push object from extension registry; 1-byte index
opExt2 = "\x83" // ditto, but 2-byte index
opExt4 = "\x84" // ditto, but 4-byte index
opTuple1 = "\x85" // build 1-tuple from stack top
opTuple2 = "\x86" // build 2-tuple from two topmost stack items
opTuple3 = "\x87" // build 3-tuple from three topmost stack items
opNewtrue = "\x88" // push True
opNewfalse = "\x89" // push False
opLong1 = "\x8a" // push long from < 256 bytes
opLong4 = "\x8b" // push really big long
opProto = '\x80' // identify pickle protocol
opNewobj = '\x81' // build object by applying cls.__new__ to argtuple
opExt1 = '\x82' // push object from extension registry; 1-byte index
opExt2 = '\x83' // ditto, but 2-byte index
opExt4 = '\x84' // ditto, but 4-byte index
opTuple1 = '\x85' // build 1-tuple from stack top
opTuple2 = '\x86' // build 2-tuple from two topmost stack items
opTuple3 = '\x87' // build 3-tuple from three topmost stack items
opNewtrue = '\x88' // push True
opNewfalse = '\x89' // push False
opLong1 = '\x8a' // push long from < 256 bytes
opLong4 = '\x8b' // push really big long
var ErrNotImplemented = errors.New("unimplemented opcode")
var ErrUnknownOptocde = errors.New("unknown opcode")
var ErrInvalidPickleVersion = errors.New("invalid pickle version")
// special marker
type mark struct{}
......@@ -96,7 +103,10 @@ func NewDecoder(r io.Reader) Decoder {
// Decode decodes the pickle stream and returns the result or an error.
func (d Decoder) Decode() (interface{}, error) {
insn := 0
for {
key, err := d.r.ReadByte()
if err == io.EOF {
......@@ -104,7 +114,7 @@ func (d Decoder) Decode() (interface{}, error) {
return nil, err
switch string(key) {
switch key {
case opMark:
case opStop:
......@@ -120,75 +130,81 @@ func (d Decoder) Decode() (interface{}, error) {
case opInt:
err = d.loadInt()
case opBinint:
err = d.loadBinInt()
case opBinint1:
err = d.loadBinInt1()
case opLong:
err = d.loadLong()
case opBinint2:
err = d.loadBinInt2()
case opNone:
err = d.loadNone()
case opPersid:
err = d.loadPersid()
case opBinpersid:
err = d.loadBinPersid()
case opReduce:
err = d.reduce()
case opString:
err = d.loadString()
case opBinstring:
err = d.loadBinString()
case opShortBinstring:
err = d.loadShortBinString()
case opUnicode:
err = d.loadUnicode()
case opBinunicode:
err = d.loadBinUnicode()
case opAppend:
err = d.loadAppend()
case opBuild:
err =
case opGlobal:
err =
case opDict:
err = d.loadDict()
case opEmptyDict:
err = d.loadEmptyDict()
case opAppends:
err = d.loadAppends()
case opGet:
err = d.get()
case opBinget:
err = d.binGet()
case opInst:
err = d.inst()
case opLongBinget:
err = d.longBinGet()
case opList:
err = d.loadList()
case opEmptyList:
case opObj:
err = d.obj()
case opPut:
err = d.loadPut()
case opBinput:
err = d.binPut()
case opLongBinput:
err = d.longBinPut()
case opSetitem:
err = d.loadSetItem()
case opTuple:
err = d.loadTuple()
case opEmptyTuple:
case opSetitems:
err = d.loadSetItems()
case opBinfloat:
err = d.binFloat()
case opProto:
v, _ := d.r.ReadByte()
if v != 2 {
err = ErrInvalidPickleVersion
return nil, fmt.Errorf("Unknown opcode: %q", key)
return nil, fmt.Errorf("Unknown opcode %d (%c) at isns %d: %q", key, key, insn, key)
if err != nil {
......@@ -200,7 +216,7 @@ func (d Decoder) Decode() (interface{}, error) {
// Push a marker
func (d *Decoder) mark() {
// Return the position of the topmost marker
......@@ -216,20 +232,21 @@ func (d *Decoder) marker() int {
// Append a new value
func (d *Decoder) append(v interface{}) {
func (d *Decoder) push(v interface{}) {
d.stack = append(d.stack, v)
// Pop a value
func (d *Decoder) pop() interface{} {
v := d.stack[len(d.stack)-1]
d.stack = d.stack[:len(d.stack)-1]
ln := len(d.stack) - 1
v := d.stack[ln]
d.stack = d.stack[:ln]
return v
// Discard the stack through to the topmost marker
func (d *Decoder) popMark() {
func (d *Decoder) popMark() error {
return ErrNotImplemented
// Duplicate the top stack item
......@@ -247,7 +264,7 @@ func (d *Decoder) loadFloat() error {
if err != nil {
return err
return nil
......@@ -273,16 +290,23 @@ func (d *Decoder) loadInt() error {
val = i
return nil
// Push a four-byte signed int
func (d *Decoder) loadBinInt() {
func (d *Decoder) loadBinInt() error {
var v int32
binary.Read(d.r, binary.LittleEndian, &v)
return nil
// Push a 1-byte unsigned int
func (d *Decoder) loadBinInt1() {
func (d *Decoder) loadBinInt1() error {
b, _ := d.r.ReadByte()
return nil
// Push a long
......@@ -293,30 +317,33 @@ func (d *Decoder) loadLong() error {
v := new(big.Int)
v.SetString(string(line[:len(line)-1]), 10)
return nil
// Push a 2-byte unsigned int
func (d *Decoder) loadBinInt2() {
func (d *Decoder) loadBinInt2() error {
return ErrNotImplemented
// Push None
func (d *Decoder) loadNone() {
func (d *Decoder) loadNone() error {
return nil
// Push a persistent object id
func (d *Decoder) loadPersid() {
func (d *Decoder) loadPersid() error {
return ErrNotImplemented
// Push a persistent object id from items on the stack
func (d *Decoder) loadBinPersid() {
func (d *Decoder) loadBinPersid() error {
return ErrNotImplemented
func (d *Decoder) reduce() {
func (d *Decoder) reduce() error {
return ErrNotImplemented
func decodeStringEscape(b []byte) string {
......@@ -345,15 +372,20 @@ func (d *Decoder) loadString() error {
return fmt.Errorf("insecure string")
d.append(decodeStringEscape(line[1 : len(line)-1]))
d.push(decodeStringEscape(line[1 : len(line)-1]))
return nil
func (d *Decoder) loadBinString() {
func (d *Decoder) loadBinString() error {
return ErrNotImplemented
func (d *Decoder) loadShortBinString() {
func (d *Decoder) loadShortBinString() error {
b, _ := d.r.ReadByte()
s := make([]byte, b)
return nil
func (d *Decoder) loadUnicode() error {
......@@ -381,12 +413,12 @@ func (d *Decoder) loadUnicode() error {
return fmt.Errorf("characters remaining after loadUnicode operation: %s", sline)
return nil
func (d *Decoder) loadBinUnicode() {
func (d *Decoder) loadBinUnicode() error {
return ErrNotImplemented
func (d *Decoder) loadAppend() error {
......@@ -402,15 +434,15 @@ func (d *Decoder) loadAppend() error {
return nil
func (d *Decoder) build() {
func (d *Decoder) build() error {
return ErrNotImplemented
func (d *Decoder) global() {
func (d *Decoder) global() error {
return ErrNotImplemented
func (d *Decoder) loadDict() {
func (d *Decoder) loadDict() error {
k := d.marker()
m := make(map[interface{}]interface{}, 0)
items := d.stack[k+1:]
......@@ -418,59 +450,63 @@ func (d *Decoder) loadDict() {
m[items[i]] = items[i+1]
d.stack = append(d.stack[:k], m)
return nil
func (d *Decoder) loadEmptyDict() {
func (d *Decoder) loadEmptyDict() error {
m := make(map[interface{}]interface{}, 0)
return nil
func (d *Decoder) loadAppends() error {
k := d.marker()
l := d.stack[len(d.stack)-1]
l := d.stack[k-1]
switch l.(type) {
case []interface{}:
l := l.([]interface{})
for _, v := range d.stack[k:len(d.stack)] {
for _, v := range d.stack[k+1 : len(d.stack)] {
l = append(l, v)
d.stack = append(d.stack[:k], l)
d.stack = append(d.stack[:k-1], l)
return fmt.Errorf("loadAppends expected a list, got %t", l)
return nil
func (d *Decoder) get() {
func (d *Decoder) get() error {
return ErrNotImplemented
func (d *Decoder) binGet() {
func (d *Decoder) binGet() error {
return ErrNotImplemented
func (d *Decoder) inst() {
func (d *Decoder) inst() error {
return ErrNotImplemented
func (d *Decoder) longBinGet() {
func (d *Decoder) longBinGet() error {
return ErrNotImplemented
func (d *Decoder) loadList() {
func (d *Decoder) loadList() error {
k := d.marker()
v := append([]interface{}{}, d.stack[k+1:]...)
d.stack = append(d.stack[:k], v)
return nil
func (d *Decoder) loadTuple() {
func (d *Decoder) loadTuple() error {
k := d.marker()
v := append([]interface{}{}, d.stack[k+1:]...)
d.stack = append(d.stack[:k], v)
return nil
func (d *Decoder) obj() {
func (d *Decoder) obj() error {
return ErrNotImplemented
func (d *Decoder) loadPut() error {
......@@ -482,12 +518,14 @@ func (d *Decoder) loadPut() error {
return nil
func (d *Decoder) binPut() {
func (d *Decoder) binPut() error {
b, _ := d.r.ReadByte()
d.memo[strconv.Itoa(int(b))] = d.stack[len(d.stack)-1]
return nil
func (d *Decoder) longBinPut() {
func (d *Decoder) longBinPut() error {
return ErrNotImplemented
func (d *Decoder) loadSetItem() error {
......@@ -504,10 +542,25 @@ func (d *Decoder) loadSetItem() error {
return nil
func (d *Decoder) setItems() {
func (d *Decoder) loadSetItems() error {
k := d.marker()
l := d.stack[k-1]
switch m := l.(type) {
case map[interface{}]interface{}:
for i := k + 1; i < len(d.stack); i += 2 {
m[d.stack[i]] = d.stack[i+1]
d.stack = append(d.stack[:k-1], m)
return fmt.Errorf("loadSetItems expected a map, got %t", m)
return nil
func (d *Decoder) binFloat() {
func (d *Decoder) binFloat() error {
f := make([]byte, 8)
u := binary.BigEndian.Uint64(f)
d.stack = append(d.stack, math.Float64frombits(u))
return nil
......@@ -2,6 +2,7 @@ package ogórek
import (
......@@ -13,43 +14,6 @@ func bigInt(s string) *big.Int {
return i
func equal(a, b interface{}) bool {
if reflect.TypeOf(a) != reflect.TypeOf(b) {
return false
switch a.(type) {
case []interface{}:
ia := a.([]interface{})
ib := b.([]interface{})
if len(ia) != len(ib) {
return false
for i := 0; i < len(ia); i++ {
if !equal(ia[i], ib[i]) {
return false
return true
case map[interface{}]interface{}:
ia := a.(map[interface{}]interface{})
ib := b.(map[interface{}]interface{})
if len(ia) != len(ib) {
return false
for k := range ia {
if !equal(ia[k], ib[k]) {
return false
return true
case *big.Int:
return a.(*big.Int).Cmp(b.(*big.Int)) == 0
return a == b
func TestMarker(t *testing.T) {
buf := bytes.Buffer{}
dec := NewDecoder(&buf)
......@@ -59,6 +23,9 @@ func TestMarker(t *testing.T) {
var graphitePickle1, _ = hex.DecodeString("80025d71017d710228550676616c75657371035d71042847407d90000000000047407f100000000000474080e0000000000047409764000000000047409c40000000000047409d88000000000047409f74000000000047409c74000000000047409cdc00000000004740a10000000000004740a0d800000000004740938800000000004740a00e00000000004740988800000000004e4e655505737461727471054a00d87a5255047374657071064a805101005503656e6471074a00f08f5255046e616d657108552d5a5a5a5a2e55555555555555552e43434343434343432e4d4d4d4d4d4d4d4d2e5858585858585858582e545454710975612e")
var graphitePickle2, _ = hex.DecodeString("286c70300a286470310a53277374617274270a70320a49313338333738323430300a73532773746570270a70330a4938363430300a735327656e64270a70340a49313338353136343830300a73532776616c756573270a70350a286c70360a463437332e300a61463439372e300a61463534302e300a6146313439372e300a6146313830382e300a6146313839302e300a6146323031332e300a6146313832312e300a6146313834372e300a6146323137362e300a6146323135362e300a6146313235302e300a6146323035352e300a6146313537302e300a614e614e617353276e616d65270a70370a5327757365722e6c6f67696e2e617265612e6d616368696e652e6d65747269632e6d696e757465270a70380a73612e")
func TestDecode(t *testing.T) {
tests := []struct {
name string
......@@ -79,6 +46,8 @@ func TestDecode(t *testing.T) {
{"unicode", "V\\u65e5\\u672c\\u8a9e\np0\n.", string("日本語")},
{"empty dict", "(dp0\n.", make(map[interface{}]interface{})},
{"dict with strings", "(dp0\nS'a'\np1\nS'1'\np2\nsS'b'\np3\nS'2'\np4\ns.", map[interface{}]interface{}{"a": "1", "b": "2"}},
{"graphite message1", string(graphitePickle1), []interface{}{map[interface{}]interface{}{"values": []interface{}{float64(473), float64(497), float64(540), float64(1497), float64(1808), float64(1890), float64(2013), float64(1821), float64(1847), float64(2176), float64(2156), float64(1250), float64(2055), float64(1570), None{}, None{}}, "start": int64(1383782400), "step": int64(86400), "end": int64(1385164800), "name": "ZZZZ.UUUUUUUU.CCCCCCCC.MMMMMMMM.XXXXXXXXX.TTT"}}},
{"graphite message2", string(graphitePickle2), []interface{}{map[interface{}]interface{}{"values": []interface{}{float64(473), float64(497), float64(540), float64(1497), float64(1808), float64(1890), float64(2013), float64(1821), float64(1847), float64(2176), float64(2156), float64(1250), float64(2055), float64(1570), None{}, None{}}, "start": int64(1383782400), "step": int64(86400), "end": int64(1385164800), "name": "user.login.area.machine.metric.minute"}}},
for _, test := range tests {
buf := bytes.NewBufferString(test.input)
......@@ -87,8 +56,9 @@ func TestDecode(t *testing.T) {
if err != nil {
if !equal(v, test.expected) {
t.Errorf("%s: got %q expected %q",, v, test.expected)
if !reflect.DeepEqual(v, test.expected) {
t.Errorf("%s: got\n%q\n expected\n%q",, v, test.expected)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment