message search (#60)

This commit is contained in:
Sarv
2025-04-19 03:06:05 +08:00
committed by GitHub
parent 85b5465d2a
commit a745519451
19 changed files with 910 additions and 519 deletions

View File

@@ -6,6 +6,7 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"regexp"
"sort"
"strings"
"time"
@@ -17,6 +18,7 @@ import (
"github.com/sjzar/chatlog/internal/errors"
"github.com/sjzar/chatlog/internal/model"
"github.com/sjzar/chatlog/internal/wechatdb/datasource/dbm"
"github.com/sjzar/chatlog/pkg/util"
)
const (
@@ -175,11 +177,16 @@ func (ds *DataSource) getDBInfosForTimeRange(startTime, endTime time.Time) []Mes
return dbs
}
func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.Time, talker string, limit, offset int) ([]*model.Message, error) {
func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.Time, talker string, sender string, keyword string, limit, offset int) ([]*model.Message, error) {
if talker == "" {
return nil, errors.ErrTalkerEmpty
}
log.Debug().Msg(talker)
// 解析talker参数支持多个talker以英文逗号分隔
talkers := util.Str2List(talker, ",")
if len(talkers) == 0 {
return nil, errors.ErrTalkerEmpty
}
// 找到时间范围内的数据库文件
dbInfos := ds.getDBInfosForTimeRange(startTime, endTime)
@@ -187,13 +194,21 @@ func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.T
return nil, errors.TimeRangeNotFound(startTime, endTime)
}
if len(dbInfos) == 1 {
// LIMIT 和 OFFSET 逻辑在单文件情况下可以直接在 SQL 里处理
return ds.getMessagesSingleFile(ctx, dbInfos[0], startTime, endTime, talker, limit, offset)
// 解析sender参数支持多个发送者以英文逗号分隔
senders := util.Str2List(sender, ",")
// 预编译正则表达式如果有keyword
var regex *regexp.Regexp
if keyword != "" {
var err error
regex, err = regexp.Compile(keyword)
if err != nil {
return nil, errors.QueryFailed("invalid regex pattern", err)
}
}
// 从每个相关数据库中查询消息
totalMessages := []*model.Message{}
// 从每个相关数据库中查询消息,并在读取时进行过滤
filteredMessages := []*model.Message{}
for _, dbInfo := range dbInfos {
// 检查上下文是否已取消
@@ -207,183 +222,141 @@ func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.T
continue
}
messages, err := ds.getMessagesFromDB(ctx, db, startTime, endTime, talker)
if err != nil {
log.Err(err).Msgf("从数据库 %s 获取消息失败", dbInfo.FilePath)
continue
}
// 对每个talker进行查询
for _, talkerItem := range talkers {
// 构建表名
_talkerMd5Bytes := md5.Sum([]byte(talkerItem))
talkerMd5 := hex.EncodeToString(_talkerMd5Bytes[:])
tableName := "Msg_" + talkerMd5
totalMessages = append(totalMessages, messages...)
// 检查表是否存在
var exists bool
err = db.QueryRowContext(ctx,
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
tableName).Scan(&exists)
if limit+offset > 0 && len(totalMessages) >= limit+offset {
break
if err != nil {
if err == sql.ErrNoRows {
// 表不存在继续下一个talker
continue
}
return nil, errors.QueryFailed("", err)
}
// 构建查询条件
conditions := []string{"create_time >= ? AND create_time <= ?"}
args := []interface{}{startTime.Unix(), endTime.Unix()}
log.Debug().Msgf("Table name: %s", tableName)
log.Debug().Msgf("Start time: %d, End time: %d", startTime.Unix(), endTime.Unix())
query := fmt.Sprintf(`
SELECT m.sort_seq, m.server_id, m.local_type, n.user_name, m.create_time, m.message_content, m.packed_info_data, m.status
FROM %s m
LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid
WHERE %s
ORDER BY m.sort_seq ASC
`, tableName, strings.Join(conditions, " AND "))
// 执行查询
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
// 如果表不存在SQLite 会返回错误
if strings.Contains(err.Error(), "no such table") {
continue
}
log.Err(err).Msgf("从数据库 %s 查询消息失败", dbInfo.FilePath)
continue
}
// 处理查询结果,在读取时进行过滤
for rows.Next() {
var msg model.MessageV4
err := rows.Scan(
&msg.SortSeq,
&msg.ServerID,
&msg.LocalType,
&msg.UserName,
&msg.CreateTime,
&msg.MessageContent,
&msg.PackedInfoData,
&msg.Status,
)
if err != nil {
rows.Close()
return nil, errors.ScanRowFailed(err)
}
// 将消息转换为标准格式
message := msg.Wrap(talkerItem)
// 应用sender过滤
if len(senders) > 0 {
senderMatch := false
for _, s := range senders {
if message.Sender == s {
senderMatch = true
break
}
}
if !senderMatch {
continue // 不匹配sender跳过此消息
}
}
// 应用keyword过滤
if regex != nil {
plainText := message.PlainTextContent()
if !regex.MatchString(plainText) {
continue // 不匹配keyword跳过此消息
}
}
// 通过所有过滤条件,保留此消息
filteredMessages = append(filteredMessages, message)
// 检查是否已经满足分页处理数量
if limit > 0 && len(filteredMessages) >= offset+limit {
// 已经获取了足够的消息,可以提前返回
rows.Close()
// 对所有消息按时间排序
sort.Slice(filteredMessages, func(i, j int) bool {
return filteredMessages[i].Seq < filteredMessages[j].Seq
})
// 处理分页
if offset >= len(filteredMessages) {
return []*model.Message{}, nil
}
end := offset + limit
if end > len(filteredMessages) {
end = len(filteredMessages)
}
return filteredMessages[offset:end], nil
}
}
rows.Close()
}
}
// 对所有消息按时间排序
sort.Slice(totalMessages, func(i, j int) bool {
return totalMessages[i].Seq < totalMessages[j].Seq
sort.Slice(filteredMessages, func(i, j int) bool {
return filteredMessages[i].Seq < filteredMessages[j].Seq
})
// 处理分页
if limit > 0 {
if offset >= len(totalMessages) {
if offset >= len(filteredMessages) {
return []*model.Message{}, nil
}
end := offset + limit
if end > len(totalMessages) {
end = len(totalMessages)
if end > len(filteredMessages) {
end = len(filteredMessages)
}
return totalMessages[offset:end], nil
return filteredMessages[offset:end], nil
}
return totalMessages, nil
}
// getMessagesSingleFile 从单个数据库文件获取消息
func (ds *DataSource) getMessagesSingleFile(ctx context.Context, dbInfo MessageDBInfo, startTime, endTime time.Time, talker string, limit, offset int) ([]*model.Message, error) {
db, err := ds.dbm.OpenDB(dbInfo.FilePath)
if err != nil {
return nil, errors.DBConnectFailed(dbInfo.FilePath, nil)
}
// 构建表名
_talkerMd5Bytes := md5.Sum([]byte(talker))
talkerMd5 := hex.EncodeToString(_talkerMd5Bytes[:])
tableName := "Msg_" + talkerMd5
// 检查表是否存在
var exists bool
err = db.QueryRowContext(ctx,
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
tableName).Scan(&exists)
if err != nil {
if err == sql.ErrNoRows {
// 表不存在,返回空结果
return []*model.Message{}, nil
}
return nil, errors.QueryFailed("", err)
}
// 构建查询条件
conditions := []string{"create_time >= ? AND create_time <= ?"}
args := []interface{}{startTime.Unix(), endTime.Unix()}
query := fmt.Sprintf(`
SELECT m.sort_seq, m.server_id, m.local_type, n.user_name, m.create_time, m.message_content, m.packed_info_data, m.status
FROM %s m
LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid
WHERE %s
ORDER BY m.sort_seq ASC
`, tableName, strings.Join(conditions, " AND "))
if limit > 0 {
query += fmt.Sprintf(" LIMIT %d", limit)
if offset > 0 {
query += fmt.Sprintf(" OFFSET %d", offset)
}
}
// 执行查询
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.QueryFailed(query, err)
}
defer rows.Close()
// 处理查询结果
messages := []*model.Message{}
for rows.Next() {
var msg model.MessageV4
err := rows.Scan(
&msg.SortSeq,
&msg.ServerID,
&msg.LocalType,
&msg.UserName,
&msg.CreateTime,
&msg.MessageContent,
&msg.PackedInfoData,
&msg.Status,
)
if err != nil {
return nil, errors.ScanRowFailed(err)
}
messages = append(messages, msg.Wrap(talker))
}
return messages, nil
}
// getMessagesFromDB 从数据库获取消息
func (ds *DataSource) getMessagesFromDB(ctx context.Context, db *sql.DB, startTime, endTime time.Time, talker string) ([]*model.Message, error) {
// 构建表名
_talkerMd5Bytes := md5.Sum([]byte(talker))
talkerMd5 := hex.EncodeToString(_talkerMd5Bytes[:])
tableName := "Msg_" + talkerMd5
// 检查表是否存在
var exists bool
err := db.QueryRowContext(ctx,
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
tableName).Scan(&exists)
if err != nil {
if err == sql.ErrNoRows {
// 表不存在,返回空结果
return []*model.Message{}, nil
}
return nil, errors.QueryFailed("", err)
}
// 构建查询条件
conditions := []string{"create_time >= ? AND create_time <= ?"}
args := []interface{}{startTime.Unix(), endTime.Unix()}
query := fmt.Sprintf(`
SELECT m.sort_seq, m.server_id, m.local_type, n.user_name, m.create_time, m.message_content, m.packed_info_data, m.status
FROM %s m
LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid
WHERE %s
ORDER BY m.sort_seq ASC
`, tableName, strings.Join(conditions, " AND "))
// 执行查询
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
// 如果表不存在SQLite 会返回错误
if strings.Contains(err.Error(), "no such table") {
return []*model.Message{}, nil
}
return nil, errors.QueryFailed(query, err)
}
defer rows.Close()
// 处理查询结果
messages := []*model.Message{}
for rows.Next() {
var msg model.MessageV4
err := rows.Scan(
&msg.SortSeq,
&msg.ServerID,
&msg.LocalType,
&msg.UserName,
&msg.CreateTime,
&msg.MessageContent,
&msg.PackedInfoData,
&msg.Status,
)
if err != nil {
return nil, errors.ScanRowFailed(err)
}
messages = append(messages, msg.Wrap(talker))
}
return messages, nil
return filteredMessages, nil
}
// 联系人