#!/usr/bin/python # -*- coding: utf-8 -*- import hashlib import random import string import struct import sys import time class Tabled: def __init__ (self, info): boto.set_file_logger("boto","/tmp/boto.out",logging.WARNING) try: self.user, self.pw, self.bucket = info except: print >> sys.stderr, "tabled needs user, pw, bucket" sys.exit(2) self.conn_list = [] self.bucket_list = [] self.curr_conn = 0 def add_server (self, host, port): cf = boto.s3.connection.OrdinaryCallingFormat() new_conn = Connection( self.user,self.pw,host=host,port=port,is_secure=False, calling_format=cf,debug=0) self.conn_list.append(new_conn) self.bucket_list.append(Bucket(new_conn,self.bucket)) def store (self, key, data): conn = self.conn_list[self.curr_conn] b = self.bucket_list[self.curr_conn] k = Key(b) k.key = key k.set_contents_from_string(data) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): conn = self.conn_list[self.curr_conn] b = self.bucket_list[self.curr_conn] k = Key(b) k.key = key k.get_contents_as_string() self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): conn = self.conn_list[self.curr_conn] b = self.bucket_list[self.curr_conn] b.delete_key(key) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) class Cassandra: def __init__ (self, info): try: self.keyspace, self.column_family = info except: print >> sys.stderr, "cassandra needs keyspace, column family" sys.exit(2) self.conn_list = [] self.curr_conn = 0 def add_server (self, host, port): socket = TSocket.TSocket(host, port) transport = TTransport.TBufferedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) client = Cassandra.Client(protocol) transport.open() self.conn_list.append(client) def store (self, key, data): client = self.conn_list[self.curr_conn] column_path = ctt.ColumnPath(column_family=self.column_family, column="data") client.insert(self.keyspace, key, column_path, data, time.time(), ctt.ConsistencyLevel.ONE); self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): client = self.conn_list[self.curr_conn] column_parent = ctt.ColumnParent(column_family=self.column_family) slice_range = ctt.SliceRange(start="", finish="") predicate = ctt.SlicePredicate(slice_range=slice_range) result = client.get_slice(self.keyspace, key, column_parent, predicate, ctt.ConsistencyLevel.ONE); if result == []: raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): client = self.conn_list[self.curr_conn] column_path = ctt.ColumnPath(column_family=self.column_family) client.remove(self.keyspace, key, column_path, time.time(), ctt.ConsistencyLevel.ONE); self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) class Riak: def __init__ (self, info): try: self.bucket = info[0] except: print >> sys.stderr, "riak needs bucket" sys.exit(2) self.conn_list = [] self.curr_conn = 0 def add_server (self, host, port): jc = jiak.JiakClient(host,port) jc.set_bucket_schema(self.bucket,["data"]) self.conn_list.append(jc) def store (self, key, data): jc = self.conn_list[self.curr_conn] jo = jiak.JiakObject(self.bucket,key) jo.object["data"] = data jc.store(jo) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): jc = self.conn_list[self.curr_conn] result = jc.fetch(self.bucket,key) if result == None: raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): jc = self.conn_list[self.curr_conn] jc.delete(self.bucket,key) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) class Voldemort: def __init__ (self, info): try: self.store_name = info[0] except: print >> sys.stderr, "voldemort needs store" sys.exit(2) self.conn_list = [] self.curr_conn = 0 def add_server (self, host, port): v = voldemort.Voldemort("ftp://%s:%d"%(host,port)) st = v.get_store(self.store_name) self.conn_list.append(st) def store (self, key, data): store = self.conn_list[self.curr_conn] store.put(key,data) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): store = self.conn_list[self.curr_conn] data, vc = store.get(key) if data == None: raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): store = self.conn_list[self.curr_conn] if not store.delete(key): raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) class Tyrant: def __init__ (self, info): self.conn_list = [] self.curr_conn = 0 def add_server (self, host, port): pt = pytyrant.PyTyrant.open(host,port) self.conn_list.append(pt) def store (self, key, data): pt = self.conn_list[self.curr_conn] pt[key] = data self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): pt = self.conn_list[self.curr_conn] data = pt[key] self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): pt = self.conn_list[self.curr_conn] del pt[key] self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) class Chunkd: def __init__ (self, info): try: self.user, self.pw = info except: print >> sys.stderr, "tabled needs user, pw" sys.exit(2) self.conn_list = [] self.curr_conn = 0 def add_server (self, host, port): cd = chunkd.Chunkd(host,port,self.user,self.pw) self.conn_list.append(cd) def store (self, key, data): cd = self.conn_list[self.curr_conn] # Chunkd's keyspace is more limited than the others, so we # accept the possibility of collision. This is reasonable # for performance testing, but don't do it for real code. m = hashlib.md5() m.update(key) if not cd.put(m.hexdigest(),data): raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): cd = self.conn_list[self.curr_conn] m = hashlib.md5() m.update(key) data = cd.get(m.hexdigest(),self.dsize) if data == None: raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): cd = self.conn_list[self.curr_conn] m = hashlib.md5() m.update(key) if not cd.delete(m.hexdigest()): raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) class Keyspace: def __init__ (self, info): self.conn_list = [] self.string_list = [] self.curr_conn = 0 def add_server (self, host, port): if len(self.conn_list): self.conn_list = [] self.string_list.append("%s:%d"%(host,port)) ks = keyspace.Client(self.string_list) #print "keyspace servers:", self.string_list self.conn_list.append(ks) def store (self, key, data): ks = self.conn_list[self.curr_conn] ks.set(key,data) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def fetch (self, key): ks = self.conn_list[self.curr_conn] if ks.get(key) == None: raise RuntimeError, "hell" self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) def delete (self, key): ks = self.conn_list[self.curr_conn] ks.delete(key) self.curr_conn = (self.curr_conn + 1) % len(self.conn_list) # Do these within exception blocks so (a) people can run without having # every single interface installed and (b) we don't have to keep reimporting # because import (or __import__) within a class/method definition doesn't # seem to work as documented and doesn't carry over to subsequent method # invocations. For the "import x" case, we can do "globals()['x'] = x" and # get the right effect, but the "from x import y" case is a lot hairier. algo_list = {} # Tabled/boto try: import boto import logging from boto.s3 import Connection, Bucket, Key algo_list["tabled"] = Tabled except: pass # Cassandra/thrift try: from thrift import Thrift from thrift.transport import TTransport from thrift.transport import TSocket from thrift.protocol import TBinaryProtocol from cassandra import Cassandra import cassandra.ttypes as ctt algo_list["cassandra"] = Cassandra except: pass # Riak/jiak try: import jiak algo_list["riak"] = Riak except: pass # Voldemort try: import voldemort algo_list["voldemort"] = Voldemort except: pass # Tyrant/pytyrant try: import pytyrant algo_list["tyrant"] = Tyrant except: pass # chunkd try: import chunkd algo_list["chunkd"] = Chunkd except: print sys.exc_info() pass # Keyspace try: import keyspace algo_list["keyspace"] = Keyspace except: pass def Usage (): print >> sys.stderr, "Usage: %s" % sys.argv[0] print >> sys.stderr, "arg 1: read or write" print >> sys.stderr, "arg 2: data item size" print >> sys.stderr, "arg 3:", string.join(algo_list.keys(),", ") print >> sys.stderr, "arg 4: comma-separated host:port list" print >> sys.stderr, "arg 5: numeric ID for this run" print >> sys.stderr, "any other arguments are passed to the store object" if __name__ == "__main__": try: op, dsize, algo, srv_list, index = sys.argv[1:6] algo_obj = algo_list[algo](sys.argv[6:]) algo_obj.dsize = int(dsize) except: Usage() sys.exit(1) random.seed(int(index)) for server in srv_list.split(","): host, port = server.split(":") algo_obj.add_server(host,int(port)) data = struct.pack("%ss"%algo_obj.dsize,"") begin_time = time.time() deadline = begin_time + 60.0 done = 0 last_key = None while True: # Generate key (plus data, if write) key = "%dkey%s" % (random.randint(0,999999999),index) # Send request to algo object try: if op == "write": algo_obj.store(key,data) elif op == "read": algo_obj.fetch(key) elif op == "delete": algo_obj.delete(key) else: Usage() sys.exit(1) except RuntimeError: end_time = time.time() break last_key = key done = done + 1 now = time.time() if now > deadline: end_time = now break if last_key: print >> sys.stderr, "%s: last key was %s" % (index, last_key) print "Did %d operations in %.1f seconds" % (done, end_time-begin_time)