import sys
import socket
import atexit
import struct
from cStringIO import StringIO
#from threading import local
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models.base import Model
# known searchd commands
SEARCHD_COMMAND_SEARCH = 0
SEARCHD_COMMAND_EXCERPT = 1
# current client-side command implementation versions
VER_COMMAND_SEARCH = 0x104
VER_COMMAND_EXCERPT = 0x100
# known searchd status codes
SEARCHD_OK = 0
SEARCHD_ERROR = 1
SEARCHD_RETRY = 2
# known match modes
SPH_MATCH_ALL = 0
SPH_MATCH_ANY = 1
SPH_MATCH_PHRASE = 2
SPH_MATCH_BOOLEAN = 3
SPH_MATCH_EXTENDED = 4
# known sort modes
SPH_SORT_RELEVANCE = 0
SPH_SORT_ATTR_DESC = 1
SPH_SORT_ATTR_ASC = 2
SPH_SORT_TIME_SEGMENTS = 3
SPH_SORT_EXTENDED = 4
# known attribute types
SPH_ATTR_INTEGER = 1
SPH_ATTR_TIMESTAMP = 2
# known grouping functions
SPH_GROUPBY_DAY = 0
SPH_GROUPBY_WEEK = 1
SPH_GROUPBY_MONTH = 2
SPH_GROUPBY_YEAR = 3
SPH_GROUPBY_ATTR = 4
class SphinxError(Exception): pass
class SphinxConnectionError(SphinxError): pass
class SphinxTransportError(SphinxError): pass
class SphinxFormatError(SphinxError): pass
class SphinxInputError(SphinxError): pass
class SphinxTemporaryError(SphinxError): pass
class SphinxClient(object): #(local):
"""Client to send requests to Sphinx searchd and interpret results.
The client can be used for multiple requests against the same host/port.
"""
def __init__(self, host='localhost', port=3312, timeout=0):
"""Set timeout to 0 or None if you do not need it."""
self.host = host
self.port = port
if timeout:
socket.setdefaulttimeout(timeout)
self._sock = None
def query(self, query, index='*', offset=0, limit=20, **kw):
"""Get search results from searchd.
Keyword arguments:
query -- search terms as a single string
index -- the index to query against, defaults to '*'
offset -- fetch results starting from offset, defaults to 0
limit -- number of results to return, defaults to 20
mode -- search mode, defaults to SPH_MATCH_ALL
sort -- sort mode, defaults to SPH_SORT_RELEVANCE
sortby --
weights --
min_id --
max_id --
filter --
min_ --
max_ --
groupby --
groupfunc -- defaults to SPH_GROUPBY_DAY
maxmatches -- defaults to 1000
"""
mode = kw.get('mode', SPH_MATCH_EXTENDED)
sort = kw.get('sort', SPH_SORT_RELEVANCE)
sortby = kw.get('sortby', '')
weights = kw.get('weights', [])
min_id = kw.get('min_id', 0)
max_id = kw.get('max_id', 0xFFFFFFFF)
filter = kw.get('filter', {})
min_ = kw.get('min_', {})
max_ = kw.get('max_', {})
groupby = kw.get('groupby', '')
groupfunc = kw.get('groupfunc', SPH_GROUPBY_DAY)
maxmatches = kw.get('maxmatches', 1000)
# check args
assert SPH_MATCH_ALL <= mode <= SPH_MATCH_EXTENDED
assert SPH_SORT_RELEVANCE <= sort <= SPH_SORT_EXTENDED
assert min_id <= max_id
assert SPH_GROUPBY_DAY <= groupfunc <= SPH_GROUPBY_ATTR
# build request
buffer = StringIO()
try:
# offset, limit, mode, sort
buffer.write(struct.pack('!LLLL', offset, limit, mode, sort))
# sortby, query
for v in [sortby, query]:
buffer.write(struct.pack('!L', len(v)))
buffer.write(v)
# weights
buffer.write(struct.pack('!L', len(weights)))
for w in weights:
buffer.write(struct.pack('!L', w))
# index
buffer.write(struct.pack('!L', len(index)))
buffer.write(index)
# id range
for i in (min_id, max_id, len(min_) + len(filter)):
buffer.write(struct.pack('!L', i))
for k, v in min_.items():
buffer.write(struct.pack('!L', len(k)))
buffer.write(k)
buffer.write(struct.pack('!LLL', 0, v, max_[k]))
for k, values in filter.items():
buffer.write(struct.pack('!L', len(k)))
buffer.write(k)
buffer.write(struct.pack('!L', len(values)))
for v in values:
buffer.write(struct.pack('!L', v))
# groupby
buffer.write(struct.pack('!LL', groupfunc, len(groupby)))
buffer.write(groupby)
# maxmatches
buffer.write(struct.pack('!L', maxmatches))
data = buffer.getvalue()
req = struct.pack('!HHL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, len(data)) + data
except (struct.error, TypeError), e:
raise SphinxInputError, "Error generating request, %s" % e, sys.exc_info()[2]
self._connect()
self._write(req)
# read back result
result = dict()
data = self._get_response(VER_COMMAND_SEARCH)
data_len = len(data)
pos = 0
fields = list()
try:
num = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
while len(fields) < num and pos < data_len:
l = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
fields.append(data[pos:pos+l])
pos += l
result['fields'] = fields
attrs = list()
num = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
while len(attrs) < num and pos < data_len:
l = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
k = data[pos:pos+l]
pos += l
v = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
attrs.append((k, v))
result['attrs'] = attrs
matches = []
num_matches = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
while len(matches) < num_matches and pos < len(data):
doc = struct.unpack('!L', data[pos:pos+4])[0]
weight = struct.unpack('!L', data[pos+4:pos+8])[0]
pos += 8
doc_attrs = dict()
for attr, attr_value in attrs:
doc_attrs[attr] = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
matches.append((doc, weight, doc_attrs))
result['matches'] = matches
result['total'], result['total_found'], result['time'], num_words = struct.unpack('!LLLL', data[pos:pos+16])
pos += 16
words = dict()
for i in range(num_words):
l = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
word = data[pos:pos+l]
pos += l
docs, hits = struct.unpack('!LL', data[pos:pos+8])
pos += 8
words[word] = dict(docs=docs, hits=hits)
except (struct.error, TypeError), e:
raise SphinxFormatError, "error unpacking result, %s" % e, sys.exc_info()[2]
result['words'] = words
self._disconnect()
return result
def build_excerpts(self, docs, index, words='', **kw):
"""Get excerpts from searchd for a list of documents.
Keyword arguments:
docs -- a list of document bodies
index -- the index to use
words -- keywords to highlight as a single string
before_match -- prefix for keyword matches, defaults to ''
after_match -- suffix for keyword matches, defaults to ''
chunk_separator - defaults to ' ... '
limit - defaults to 256
around - defaults to 5
"""
before_match = kw.get('before_match', '')
after_match = kw.get('after_match', '')
chunk_separator = kw.get('chunk_separator', ' ... ')
limit = kw.get('limit', 256)
around = kw.get('around', 5)
# build request
buffer = StringIO()
try:
buffer.write(struct.pack('!LL', 0, 1))
for opt in (index, words, before_match, after_match, chunk_separator):
buffer.write(struct.pack('!L', len(opt)))
buffer.write(opt)
buffer.write(struct.pack('!L', limit))
buffer.write(struct.pack('!L', around))
buffer.write(struct.pack('!L', len(docs)))
for d in docs:
buffer.write(struct.pack('!L', len(d)))
buffer.write(d)
data = buffer.getvalue()
req = struct.pack('!HHL', SEARCHD_COMMAND_EXCERPT, VER_COMMAND_EXCERPT, len(data)) + data
except (struct.error, TypeError), e:
raise SphinxInputError, "Error generating request, %s" % e, sys.exc_info()[2]
self._connect()
self._write(req)
# read back result
result = list()
data = self._get_response(VER_COMMAND_EXCERPT)
data_len = len(data)
pos = 0
try:
for d in docs:
l = struct.unpack('!L', data[pos:pos+4])[0]
pos += 4
result.append(data[pos:pos+l])
pos += l
except (struct.error, TypeError), e:
raise SphinxFormatError, "error unpacking result, %s" % e, sys.exc_info()[2]
self._disconnect()
return result
def _get_response(self, client_version):
# fetch the response from searchd and split it in header and data
header = self._read(8)
try:
status, version, length = struct.unpack('!HHL', header)
except (ValueError, struct.error), e:
raise SphinxError, "error unpacking response header %s" % e
data = self._read(length)
if status == SEARCHD_ERROR:
raise SphinxError, data[4:]
if status == SEARCHD_RETRY:
raise SphinxTemporaryError, data[4:]
if status != SEARCHD_OK:
raise SphinxError, "unkown status code %s" % status
if version < client_version:
# TODO: use logging
print >>sys.stderr, "searchd command v.%d.%d older than client's v.%d.%d" % (
version >> 8, version & 0xff, client_version >> 8, client_version & 0xff)
return data
def _read(self, length):
# read from socket
msg = list()
received = 0
while received < length:
try:
chunk = self._sock.recv(length - received)
except socket.error, e:
raise SphinxTransportError, "error while reading from socket, %s" % e
if chunk == '':
raise SphinxError, "socket connection broken, read %s bytes" % received
msg.append(chunk)
received += len(chunk)
return ''.join(msg)
def _write(self, buffer):
# write to socket
sent = 0
while sent < len(buffer):
try:
s = self._sock.send(buffer[sent:])
except socket.error, e:
raise SphinxTransportError, "error while writing to socket, %s" % e
if s == 0:
raise SphinxError, "socket connection broken"
sent += s
return sent
def _connect(self):
# connect to searchd
if not self._sock:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
self._sock = s
atexit.register(self._disconnect)
data = struct.unpack('!L', self._read(4))
if not len(data):
raise SphinxConnectionError, "no version received from server"
if data[0] < 1:
raise SphinxConnectionError, "invalid protocol version %s received from server" % data[0]
self._write(struct.pack('!L', 1))
except socket.error, e:
raise SphinxConnectionError, e
except struct.error, e:
raise SphinxConnectionError, e
def _disconnect(self):
# disconnect from searchd
if self._sock:
try:
self._sock.close()
except socket.error:
pass
self._sock = None
class SphinxSearch(object):
def __init__(self, index=None, excerpts_field=None, **kw):
# TODO: basic search parameters
self._model = None
self.index = kw.get('index')
if excerpts_field:
if isinstance(excerpts_field, tuple):
try:
attr, field = excerpts_field
except ValueError:
raise AssertionError("excerpts_fields has to be a string or a two-element tuple.")
elif isinstance(excerpts_field, basestring):
attr = excerpts_field
field = None
else:
raise AssertionError("excerpts_fields has to be a string or a two-element tuple.")
charset = getattr(settings, 'DEFAULT_CHARSET', 'utf-8')
def func(o):
text = getattr(o, attr)
if field:
text = getattr(text, field)
if isinstance(text, unicode):
text = text.encode(charset, 'replace')
return text
self._get_excerpt = func
self._build_excerpts = True
self._query = None
self._query_cache = None
self._offset = 0
self._limit = 20
self._client = SphinxClient(
getattr(settings, 'SPHINX_SERVER', None),
getattr(settings, 'SPHINX_PORT', None))
self.before_match = getattr(settings, 'SPHINX_EXCERPT_BEFORE_MATCH', '')
self.after_match = getattr(settings, 'SPHINX_EXCERPT_AFTER_MATCH', '')
self._select_related = False
self._select_related_args = dict()
self._extra = dict()
def __get__(self, instance, owner):
if instance is not None or Model not in owner.__bases__:
raise AttributeError, "Search manager is only accessible via a model class"
self._model = owner
if not self.index:
self.index = self._model._meta.db_table
return self
def __len__(self):
self._matches
return self._result.get('total', 0)
def __iter__(self):
return iter(self._matches)
def __getitem__(self, k):
if not isinstance(k, (slice, int)):
raise TypeError
if isinstance(k, slice):
start = k.start or 0
stop = k.stop or 0
if start < 0 or stop < 0:
raise AssertionError("Negative indexing is not supported.")
num = stop - start
if start < self._offset or (start - self._offset) + num > self._limit:
#print "clearing qcache", self._offset, self._limit, start, stop
self._query_cache = None
self._offset = start
self._limit = stop - start
return self.matches
#print self._offset, start, stop, num
#print "[%s:%s]" % (start-self._offset, num)
return self._matches[start-self._offset:stop-self._offset]
else:
if k < 0:
raise AssertionError("Negative indexing is not supported.")
if k < self._offset or k > self._offset + self._limit:
#print "clearing qcache", self._offset, self._limit, k
self._query_cache = None
self._offset = k
self._limit = 1
return self._matches[0]
return self._matches[k - self._offset]
@property
def _matches(self):
if self._query_cache:
return self._query_cache
self._result = self._client.query(self._query, index=self.index, offset=self._offset, limit=self._limit)
qs = self._model.objects.filter(pk__in=[m[0] for m in self._result['matches']])
if self._select_related:
qs = qs.select_related(self._select_related_args)
if self._extra:
qs = qs.extra(**self._extra)
qs = dict((o.id, o) for o in qs)
matches = list()
for id, weight, extra in self._result['matches']:
match = qs.get(id)
if not match:
continue
match.sphinx_weight = weight
matches.append(match)
# build excerpts
if self._build_excerpts:
excerpts = self._client.build_excerpts(
[self._get_excerpt(o) for o in matches],
self.index, self._query,
before_match=self.before_match,
after_match=self.after_match)
if len(excerpts) == len(matches):
for i, e in enumerate(excerpts):
matches[i].sphinx_excerpt = e
self._query_cache = matches
return matches
def query(self, query, build_excerpts=True):
if query == self._query:
return self
#print "new query", query, self._query
self._query = query
self._query_cache = None
if self._get_excerpt and build_excerpts:
self._build_excerpts = True
self._offset = 0
self._limit = 20
return self
def select_related(self, **kw):
self._select_related = True
self._select_related_args.update(**kw)
return self
def extra(self, **kw):
self._extra.update(**kw)
return self
def count(self):
self._matches
return self._result.get('total', 0)