"""voldemort_client is a synchronous Python client for Voldemort How to start a Voldemort server for testing:: $ ./bin/voldemort-server.sh config/single_node_cluster/ TCP Usage:: v = Voldemort('tcp://localhost:6666') st = v.get_store('test') st.put('foo', bar') value, version = st.get('foo') assert value == 'bar' HTTP Usage:: v = Voldemort('http://localhost:8081') st = v.get_store('test') st.put('foo', bar') value, version = st.get('foo') assert value == 'bar' """ from __future__ import with_statement import time import array import struct import base64 import socket import httplib import urlparse import binascii from contextlib import closing from xml.etree import ElementTree as ET __version__ = '0.1' VERSION_HEADER = 'X-vldmt-version' CLUSTER_KEY = 'cluster.xml' STORES_KEY = 'stores.xml' MAX_NUMBER_OF_VERSIONS = 0x7fff class InconsistentDataError(ValueError): pass class ObsoleteVersionError(ValueError): pass def b2i_uint(arr, offset, num_bytes): rval = 0 for i in xrange(offset , offset + num_bytes): rval = (rval << 8) | arr[i] return rval def i2b_uint_len(n): for i in xrange(1, 8 + 1): if n < (1 << (8 * i)): return i raise ValueError("%r will not fit in 64-bits" % (n,)) def i2b_uint(value, num_bytes): """ >>> i2b_uint(256, 2) [1, 0] >>> i2b_uint(255, 2) [0, 255] """ return [(value >> i) & 0xff for i in xrange(8 * (num_bytes - 1), -1, -8)] def compare_vector_clocks(v1, v2): """Returns: * ``0`` if concurrent * ``-1`` if before; equal (arbitrarily) or v2 is a successor * ``1`` if after; v1 is a successor """ v1_bigger = False v2_bigger = False ver1 = v1.versions ver2 = v2.versions p1 = 0 p2 = 0 while p1 < len(ver1) and p2 < len(ver2): v1_node, v1_version = ver1[p1] v2_node, v2_version = ver2[p2] if v1_node == v2_node: if v1_version > v2_version: v1_bigger = True elif v2_version > v1_version: v2_bigger = True p1 += 1 p2 += 1 elif v1_node > v2_node: # v1 is missing a version that v2 has v2_bigger = True p2 += 1 else: # v2 is missing a version that v1 has v1_bigger = True p1 += 1 # check for leftover versions if p1 < len(ver1): v1_bigger = True elif p2 < len(ver2): v2_bigger = True if not (v1_bigger or v2_bigger): # return -1 arbitrarily for equal clocks return -1 elif v1_bigger and v2_bigger: # parallel clocks return 0 return 0 elif v1_bigger: # v1 is a successor to v2, return 1 return 1 else: # v2 is a successor to v1, return -1 return -1 class VectorClock(object): """ >>> v = VectorClock.from_base64('AAEBAAABAAABH030x/Y=') >>> v.versions == [(0, 1)] True >>> v.timestamp == 1233963501558L True >>> v.to_base64() == 'AAEBAAABAAABH030x/Y=' True """ def __init__(self, versions=None, timestamp=None): if timestamp is None: timestamp = int(time.time() * 1000) if versions is None: versions = [] self.timestamp = timestamp self.versions = versions @classmethod def from_bytes(cls, bytes): num_entries, version_size = struct.unpack('>HB', bytes[:3]) entry_size = 2 + version_size min_bytes = 2 + 1 + (num_entries * entry_size) + 8 if len(bytes) < min_bytes: raise ValueError( "Too few bytes: expected at least %d but found only %d" % ( min_bytes, len(bytes))) a = array.array('B', bytes[3:3 + min_bytes]) index = 0 entries = [] for _ in xrange(num_entries): node_id = (a[index] << 8) | a[index + 1] version = b2i_uint(a, index + 2, version_size) entries.append((node_id, version)) index += entry_size timestamp = struct.unpack('>Q', bytes[index + 3:index + 3 + 8])[0] return cls(entries, timestamp) @classmethod def from_base64(cls, s): return cls.from_bytes(base64.standard_b64decode(s)) def to_bytes(self): max_version = (max(version for (node_id, version) in self.versions) if self.versions else 0) version_size = i2b_uint_len(max_version) num_entries = len(self.versions) a = array.array('B', [ (num_entries >> 8) & 0xff, num_entries & 0xff, version_size, ]) for (node_id, version) in self.versions: a.extend([(node_id >> 8) & 0xff, node_id & 0xff] + i2b_uint(version, version_size)) return a.tostring() + struct.pack('>Q', self.timestamp) def to_base64(self): return base64.standard_b64encode(self.to_bytes()) @property def size_in_bytes(self): max_version = (max(version for (node_id, version) in self.versions) if self.versions else 0) version_size = i2b_uint_len(max_version) return 2 + 1 + (len(self.versions) * (2 + version_size)) + 8 def __repr__(self): return '%s(%r, %r)' % ( type(self).__name__, self.versions, self.timestamp) def __hash__(self): return hash(self.versions) def __eq__(self, other): if not isinstance(other, VectorClock): raise TypeError("VectorClock can only be compared to VectorClock") # NOTE: the timestamp on the VectorClock is not used for equality return (self is other) or (self.versions == other.versions) def incremented(self, node_id, timestamp=None): versions = list(self.versions) if 0 > node_id > 0x7fff: raise ValueError( "%r is outside of the acceptable range of node ids" % ( node_id,)) for i, (v_node_id, version) in enumerate(self.versions): if v_node_id == node_id: versions[i] = (v_node_id, 1 + version) break else: if len(versions) > MAX_NUMBER_OF_VERSIONS: raise ValueError("Vector clock is full!") versions.append((node_id, 1)) return type(self)(versions, timestamp) def utf8_str(s): """``s.encode('utf-8') if isinstance(s, unicode) else str(s)`` >>> isinstance(utf8_str(u''), str) True >>> utf8_str(1) == '1' True """ return s.encode('utf-8') if isinstance(s, unicode) else str(s) def fnv_hash(bytes): hash = 0x811c9dc5 fnv_prime = 0x1000193 for c in array.array('B', bytes): hash = 0xffffffff & ((hash ^ c) * fnv_prime) if hash & 0x80000000: return hash - 0x100000000 else: return hash class StringSerializer(object): def __init__(self, schema_map={}, has_version=False): assert schema_map == {} #assert has_version == False self.schema_map = schema_map self.has_version = has_version self.newest_version = max(schema_map) if schema_map else 0 def to_bytes(self, s): return utf8_str(s) def from_bytes(self, bytes): return bytes.decode('utf-8') def __repr__(self): ATTRS = 'schema_map', 'has_version' return '<%s %s>' % ( type(self).__name__, ' '.join('%s=%r' % (k, getattr(self, k)) for k in ATTRS)) class NotJSONSerializer(object): def __init__(self, schema_map, has_version): assert schema_map == {0: '"string"'} assert has_version == True self.schema_map = schema_map self.has_version = has_version self.newest_version = max(schema_map) if schema_map else 0 def to_bytes(self, s): if self.has_version: v = chr(self.newest_version) else: v = '' if s is None: return v + struct.pack('>h', -1) s = utf8_str(s) return v + struct.pack('>h', len(s)) + s def from_bytes(self, bytes): offset = 0 if self.has_version: v = ord(bytes[0]) offset += 1 else: v = 0 assert self.schema_map[v] == '"string"' string_len = struct.unpack('>h', bytes[offset:offset + 2])[0] offset += 2 if string_len == -1: return None return bytes[offset:offset + string_len].decode('utf-8') def __repr__(self): ATTRS = 'schema_map', 'has_version' return '<%s %s>' % ( type(self).__name__, ' '.join('%s=%r' % (k, getattr(self, k)) for k in ATTRS)) def serializer(name, schema_map, has_version): SERIALIZERS = { 'json': NotJSONSerializer, 'string': StringSerializer, } return SERIALIZERS[name](schema_map, has_version) def serializer_from_xml(x): schema_map = {} has_version = True for info in x.findall('schema-info'): v = x.get('version') if v is None: version = 0 elif v == 'none': version = 0 has_version = False else: version = int(v) schema_map[version] = info.text return serializer( name=x.findtext('type'), schema_map=schema_map, has_version=has_version, ) def socksend(sock, lst): for chunk in lst: sock.sendall(chunk) def sockrecv(sock, bytes): d = '' while len(d) < bytes: d += sock.recv(min(8192, bytes - len(d))) return d class VoldemortTCP(object): OP_CODE = dict(GET=1, PUT=2, DELETE=3) def __init__(self, host, socket_port): self.host = host self.socket_port = socket_port self.conn = self.get_connection() def get_connection(self): sock = socket.socket() sock.connect((self.host, self.socket_port)) sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) return sock def send_cmd(self, conn, op, store_name, packed_key, extra): store_name = utf8_str(store_name) iolist = [ chr(self.OP_CODE[op]), struct.pack('>h', len(store_name)), store_name, struct.pack('>i', len(packed_key)), packed_key, ] + extra socksend(conn, iolist) err_code = struct.unpack('>h', sockrecv(conn, 2))[0] if err_code == 0: return err_msg_len = struct.unpack('>h', sockrecv(conn, 2))[0] err_msg = sockrecv(conn, err_msg_len) if err_code == 4: raise ObsoleteVersionError(err_msg) elif err_code == 8: raise InconsistentDataError(err_msg) else: raise ValueError("Unknown Voldemort error code %d: %s" % (err_code, err_msg)) def get_raw(self, store_name, packed_key): res = [] conn = self.conn self.send_cmd(conn, 'GET', store_name, packed_key, []) num_results = struct.unpack('>i', sockrecv(conn, 4))[0] for i in xrange(num_results): chunk_len = struct.unpack('>i', sockrecv(conn, 4))[0] chunk = sockrecv(conn, chunk_len) clock = VectorClock.from_bytes(chunk) res.append((chunk[clock.size_in_bytes:], clock)) return res def put_raw(self, store_name, packed_key, packed_value, version): packed_version = version.to_bytes() chunk = packed_version + packed_value conn = self.conn self.send_cmd(conn, 'PUT', store_name, packed_key, [ struct.pack('>i', len(chunk)), chunk, ]) def delete_raw(self, store_name, packed_key, version): packed_version = version.to_bytes() conn = self.conn self.send_cmd(conn, 'DELETE', store_name, packed_key, [ struct.pack('>h', len(packed_version)), packed_version, ]) return sockrecv(conn, 1) == '\x01' class VoldemortHTTP(object): def __init__(self, host, http_port): self.host = host self.http_port = http_port def get_connection(self): """Get the raw :class:`HTTPConnection` to Voldemort""" return httplib.HTTPConnection('%s:%s' % (self.host, self.http_port)) def store_path(self, store_name, packed_key): return '/%s/%s' % ( store_name, binascii.b2a_hex(packed_key)) def http(self, conn, method, path, data=None, version=None): """Make a HTTP request to Voldemort, return the response body. * *method* must be ``'GET'``, ``'PUT'`` or ``'DELETE'`` """ headers = {} if not path[:1] == '/': path = '/' + path if method == 'GET': if data is not None: raise TypeError("data must be None for GET") elif method == 'DELETE': if data is not None: raise TypeError("data must be None for DELETE") if version is None: raise TypeError("version is required for DELETE") headers[VERSION_HEADER] = version.to_base64() elif method == 'PUT': if data is None: raise TypeError("data must not be None for PUT") if version is None: raise TypeError("version is required for DELETE") headers[VERSION_HEADER] = version.to_base64() headers['Content-length'] = str(len(data)) else: raise ValueError("Voldemort does not support method %r" % ( method,)) conn.request(method, path, data, headers) response = conn.getresponse() status, body = response.status, response.read() if 200 <= status < 300: return body elif status == 409: junk = body[body.index('
'):body.rindex('
') + 6] message = ET.XML(ET.XML(junk).text).findtext('message') raise ObsoleteVersionError(message) else: raise ValueError("Voldemort response failure for %s %s %s" % ( path, response.status, response.reason)) def get_raw(self, store_name, packed_key): path = self.store_path(store_name, packed_key) with closing(self.get_connection()) as conn: bytes = self.http(conn, 'GET', path) index = 0 res = [] while index < len(bytes): size = struct.unpack('>i', bytes[index:index + 4])[0] index += 4 chunk = bytes[index:index + size] index += size clock = VectorClock.from_bytes(chunk) res.append((chunk[clock.size_in_bytes:], clock)) return res def put_raw(self, store_name, packed_key, packed_value, version): path = self.store_path(store_name, packed_key) with closing(self.get_connection()) as conn: self.http(conn, 'PUT', path, packed_value, version) def delete_raw(self, store_name, packed_key, version): path = self.store_path(store_name, packed_key) with closing(self.get_connection()) as conn: self.http(conn, 'DELETE', path, None, version) class Node(object): def __init__(self, id, host, http_port, socket_port, partitions): self.id = id self.host = host self.http_port = http_port try: socket_port = int(socket_port) except (ValueError, TypeError): socket_port = None self.socket_port = socket_port self.partitions = partitions if socket_port: t = VoldemortTCP(host, socket_port) else: t = VoldemortHTTP(host, http_port) self.transport = t self.get_raw = t.get_raw self.delete_raw = t.delete_raw self.put_raw = t.put_raw @classmethod def from_xml(cls, server): return Node( id=int(server.findtext('id')), host=server.findtext('host'), http_port=server.findtext('http-port'), socket_port=server.findtext('socket-port'), partitions=[ int(s.strip()) for s in server.findtext('partitions').split(',') if s.strip() ], ) @classmethod def from_url(cls, url): scheme, netloc, _path, _query, _frag = urlparse.urlsplit(url) socket_port = None http_port = None if scheme == 'http': host, _, http_port = netloc.partition(':') if not http_port: http_port = '8081' # JD urlparse.urlsplit doesn't work for tcp: but it does for ftp: elif scheme == 'ftp': host, _, socket_port = netloc.partition(':') if not socket_port: socket_port = '6666' else: raise ValueError("Invalid URL scheme for Voldemort %r" % (url,)) return cls( id=None, host=host, http_port=http_port, socket_port=socket_port, partitions=None) def __eq__(self, other): return self.id == other.id def __hash__(self): return hash(self.id) def __repr__(self): ATTRS = 'id', 'host', 'http_port', 'socket_port', 'partitions' return '<%s %s>' % ( type(self).__name__, ' '.join('%s=%r' % (k, getattr(self, k)) for k in ATTRS)) class Store(object): def __init__(self, name, persistence, routing, replication_factor, required_reads, required_writes, preferred_reads, preferred_writes, retention_days, key_serializer, value_serializer): self.name = name self.persistence = persistence self.routing = routing self.replication_factor = replication_factor self.required_reads = required_reads self.required_writes = required_writes self.preferred_reads = preferred_reads self.preferred_writes = preferred_writes self.retention_days = retention_days self.key_serializer = key_serializer self.value_serializer = value_serializer @classmethod def from_xml(cls, store): def int_optional(x, k): s = x.findtext(k) return int(s) if s else None return cls( name=store.findtext('name'), persistence=store.findtext('persistence'), routing=store.findtext('routing'), replication_factor=int(store.findtext('replication-factor')), required_reads=int(store.findtext('required-reads')), required_writes=int(store.findtext('required-writes')), preferred_reads=int_optional(store, 'preferred-reads'), preferred_writes=int_optional(store, 'preferred-reads'), retention_days=int_optional(store, 'retention-days'), key_serializer=serializer_from_xml(store.find('key-serializer')), value_serializer=serializer_from_xml( store.find('value-serializer')), ) def __repr__(self): ATTRS = ('name', 'persistence', 'routing', 'replication_factor', 'required_reads', 'required_writes', 'preferred_reads', 'preferred_writes', 'retention_days', 'key_serializer', 'value_serializer') return '<%s %s>' % ( type(self).__name__, ' '.join('%s=%r' % (k, getattr(self, k)) for k in ATTRS)) class ConsistentRouter(object): def __init__(self, store, nodes): self.store = store self.nodes = nodes self.num_replicas = store.replication_factor pmap = {} for node in nodes: for partition in node.partitions: if partition in pmap: raise ValueError( "Duplicate partition id %s in cluster configuration" % (partition,)) pmap[partition] = node plist = [] for i in xrange(len(pmap)): try: plist.append(pmap[i]) except KeyError: raise ValueError("Missing tag %s" % (i,)) self.partitions = plist def route_request(self, key): p = self.partitions num_results = self.num_replicas res = [] index = abs(fnv_hash(key)) % len(p) for _i in xrange(len(p)): node = p[index] if node not in res: res.append(node) if len(res) >= num_results: # we have enough results, go home return res index = (index + 1) % len(p) # we don't have enough results, but that might be ok return res def resolve_conflicts(self, pairs): # VectorClock based inconsistency resolver if len(pairs) <= 1: return pairs pairs.sort(cmp=lambda a,b: compare_vector_clocks(a[1], b[1])) last_data, last_clock = pairs.pop() concurrent = [(last_data, last_clock)] for (data, clock) in reversed(pairs): if compare_vector_clocks(clock, last_clock) == 0: concurrent.append((data, clock)) else: break pairs = concurrent # timestamp based inconsistency resolver if len(pairs) <= 1: return pairs max_timestamp = None max_pair = None for pair in pairs: if max_timestamp is None or pair[1].timestamp > max_timestamp: max_pair = pair return [max_pair] def read_repair(self, store_name, packed_key, retrieved): if len(retrieved) <= 1: return retrieved # TODO: implement read repair return retrieved def get_raw(self, store_name, packed_key): results = [] for node in self.route_request(packed_key): res = node.get_raw(store_name, packed_key) results.extend((node, v) for v in res) results = self.read_repair(store_name, packed_key, results) return self.resolve_conflicts([v for (node, v) in results]) def delete_raw(self, store_name, packed_key, version): for node in self.route_request(packed_key): node.delete_raw(store_name, packed_key, version) def put_raw(self, store_name, packed_key, packed_value, version): nodes = self.route_request(packed_key) master = nodes[0] master_v = version.incremented(master.id) master.put_raw(store_name, packed_key, packed_value, master_v) for node in nodes[1:]: node.put_raw(store_name, packed_key, packed_value, master_v) return version.incremented(master.id) class VoldemortStore(object): def __init__(self, connection, store_name, key_serializer, value_serializer): self.connection = connection self.store_name = store_name self.key_serializer = key_serializer self.value_serializer = value_serializer def get(self, key, default=None): packed_key = self.key_serializer.to_bytes(key) results = self.connection.get_raw(self.store_name, packed_key) if not results: return default, None elif len(results) == 1: chunk, clock = results[0] return self.value_serializer.from_bytes(chunk), clock else: raise InconsistentDataError( "Unresolved versions for key %r" % (key,)) def get_value(self, key, default=None): return self.get(key, default)[0] def locate(self, key): packed_key = self.key_serializer.to_bytes(key) return self.connection.route_request(packed_key) def put(self, key, value, version=None): if version is None: version = self.get(key)[1] or VectorClock() packed_key = self.key_serializer.to_bytes(key) packed_value = self.value_serializer.to_bytes(value) return self.connection.put_raw(self.store_name, packed_key, packed_value, version) def delete(self, key): version = self.get(key)[1] if version is None: return False packed_key = self.key_serializer.to_bytes(key) self.connection.delete_raw(self.store_name, packed_key, version) return True class Voldemort(object): """Voldemort HTTP connection """ def __init__(self, bootstrap_url): """Create a Voldemort connection """ self.bootstrap_url = bootstrap_url # name, nodes, stores, routing come from bootstrap info self.name = None self.nodes = None self.stores = None self.bootstrap() def __repr__(self): return '<%s name=%r bootstrap_url=%r>' % ( type(self).__name__, self.name, self.bootstrap_url) def get_store(self, store_name): store = self.stores[store_name] return VoldemortStore(ConsistentRouter(store, self.nodes), store_name, store.key_serializer, store.value_serializer) def bootstrap(self): store = VoldemortStore(Node.from_url(self.bootstrap_url), 'metadata', StringSerializer(), StringSerializer()) cluster_xml = store.get(CLUSTER_KEY)[0] stores_xml = store.get(STORES_KEY)[0] if cluster_xml is None or stores_xml is None: raise ValueError( "Couldn't bootstrap cluster.xml and/or stores.xml") cx = ET.XML(cluster_xml) self.name = cx.findtext('name') self.nodes = map(Node.from_xml, cx.findall('server')) sx = ET.XML(stores_xml) self.stores = dict( (store.name, store) for store in map(Store.from_xml, sx.findall('store'))) def main(): import doctest doctest.testmod() if __name__ == '__main__': main()