601 lines
17 KiB
Go
601 lines
17 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"git.jingrow.com/go-mysql-org/go-mysql/replication"
|
|
_ "git.jingrow.com/marcboeker/go-duckdb/v2"
|
|
"git.jingrow.com/parquet-go/parquet-go"
|
|
"git.jingrow.com/parquet-go/parquet-go/compress/snappy"
|
|
"git.jingrow.com/parquet-go/parquet-go/compress/zstd"
|
|
|
|
"vitess.io/vitess/go/vt/sqlparser"
|
|
)
|
|
|
|
const CREATE_TABLE_SQL string = `
|
|
CREATE TABLE IF NOT EXISTS query (
|
|
binlog VARCHAR,
|
|
db_name VARCHAR,
|
|
table_name VARCHAR,
|
|
timestamp INTEGER,
|
|
type VARCHAR,
|
|
row_id INTEGER,
|
|
event_size INTEGER
|
|
)
|
|
`
|
|
|
|
const INSERT_QUERY_SQL string = "INSERT INTO query (binlog, db_name, table_name, timestamp, type, row_id, event_size) VALUES "
|
|
|
|
type Query struct {
|
|
Timestamp uint32
|
|
Metadata SQLSourceMetadata
|
|
RowId int32
|
|
EventSize uint32
|
|
SQL string
|
|
}
|
|
|
|
type BinlogIndexer struct {
|
|
BatchSize int
|
|
binlogName string
|
|
binlogPath string
|
|
compressionMode string // "low-memory" or "balanced"
|
|
|
|
// State
|
|
queries []Query
|
|
currentRowId int32
|
|
estimatedMemory int64 // Track estimated memory usage in bytes
|
|
|
|
// Internal
|
|
db *sql.DB
|
|
fw *os.File
|
|
pw *parquet.GenericWriter[ParquetRow]
|
|
parquetBuffer []ParquetRow
|
|
parser *replication.BinlogParser
|
|
sqlParser *sqlparser.Parser
|
|
isClosed bool
|
|
sqlStringBuilderForFlush strings.Builder
|
|
|
|
// String interning for memory optimization
|
|
stringCache map[string]string
|
|
|
|
// AnnotateRowsEvent parsing related
|
|
annotateRowsEvent *replication.MariadbAnnotateRowsEvent
|
|
annotateRowsEventTimestamp uint32
|
|
tableMapEvents [][]string // list of [database, table]
|
|
annotateRowsEventSize uint32 // actual event + related events (table map + write rows)
|
|
}
|
|
|
|
type ParquetRow struct {
|
|
Id int32 `parquet:"id, type=INT32"`
|
|
Query string `parquet:"query, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
|
|
}
|
|
|
|
//export NewBinlogIndexer
|
|
func NewBinlogIndexer(basePath string, binlogPath string, databaseFilename string, compressionMode string) (*BinlogIndexer, error) {
|
|
if _, err := os.Stat(basePath); os.IsNotExist(err) {
|
|
return nil, fmt.Errorf("base path does not exist: %w", err)
|
|
}
|
|
binlogFilename := filepath.Base(binlogPath)
|
|
|
|
// Configure DuckDB memory based on compression mode
|
|
var duckdbMemoryLimit string
|
|
switch compressionMode {
|
|
case "low-memory":
|
|
duckdbMemoryLimit = "15MB" // Aggressive limit for <50MB target
|
|
case "high-compression":
|
|
duckdbMemoryLimit = "512MB" // Large memory allowance for 1GB target
|
|
default: // "balanced"
|
|
duckdbMemoryLimit = "50MB" // Moderate limit
|
|
}
|
|
|
|
dbPath := filepath.Join(basePath, databaseFilename) + "?memory_limit=" + duckdbMemoryLimit + "&threads=1"
|
|
db, err := sql.Open("duckdb", dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
// Create SQL sql_parser
|
|
sql_parser, err := sqlparser.New(sqlparser.Options{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create sql parser: %w", err)
|
|
}
|
|
|
|
// Create table
|
|
_, err = db.Exec(CREATE_TABLE_SQL)
|
|
if err != nil {
|
|
_ = db.Close()
|
|
return nil, fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
|
|
/*
|
|
* Delete current binlog data (if exists)
|
|
* Because, parquet file doesn't support appending data
|
|
* So we need to delete the current binlog data before we start index it again
|
|
*/
|
|
|
|
_, err = db.Exec("DELETE FROM query WHERE binlog = ?", binlogFilename)
|
|
if err != nil {
|
|
_ = db.Close()
|
|
return nil, fmt.Errorf("failed to delete binlog data: %w", err)
|
|
}
|
|
|
|
parquet_filepath := filepath.Join(basePath, fmt.Sprintf("queries_%s.parquet", binlogFilename))
|
|
_ = os.Remove(parquet_filepath)
|
|
|
|
// Create file writer
|
|
parquetFile, err := os.Create(parquet_filepath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create parquet file: %w", err)
|
|
}
|
|
var parquetOptions []parquet.WriterOption
|
|
|
|
switch compressionMode {
|
|
case "low-memory":
|
|
// Snappy: ~5MB compression buffer, minimal page buffers
|
|
parquetOptions = []parquet.WriterOption{
|
|
parquet.Compression(&snappy.Codec{}),
|
|
parquet.PageBufferSize(512 * 1024), // 512KB
|
|
parquet.MaxRowsPerRowGroup(100), // Flush every 100 rows
|
|
parquet.DataPageStatistics(false),
|
|
}
|
|
case "high-compression":
|
|
// ZSTD max compression: Large buffers for maximum compression ratio
|
|
parquetOptions = []parquet.WriterOption{
|
|
parquet.Compression(&zstd.Codec{Level: zstd.SpeedBestCompression}),
|
|
parquet.PageBufferSize(64 * 1024 * 1024), // 64MB page buffer
|
|
parquet.MaxRowsPerRowGroup(50000), // Very large row groups
|
|
parquet.DataPageStatistics(true), // Enable statistics for better compression
|
|
parquet.ColumnPageBuffers(
|
|
parquet.NewFileBufferPool(basePath, "tmp_buffers.*"), // File-backed buffers
|
|
),
|
|
}
|
|
default: // "balanced" mode
|
|
// ZSTD default: Good compression, moderate buffers
|
|
parquetOptions = []parquet.WriterOption{
|
|
parquet.Compression(&zstd.Codec{Level: zstd.SpeedDefault}),
|
|
parquet.PageBufferSize(2 * 1024 * 1024), // 2MB
|
|
parquet.MaxRowsPerRowGroup(1000), // Flush every 1000 rows
|
|
parquet.DataPageStatistics(false),
|
|
}
|
|
}
|
|
|
|
parquetWriter := parquet.NewGenericWriter[ParquetRow](parquetFile, parquetOptions...)
|
|
|
|
// Adjust batch size and allocations based on compression mode
|
|
var actualBatchSize int
|
|
var builderSize int
|
|
var queriesCapacity int
|
|
|
|
switch compressionMode {
|
|
case "low-memory":
|
|
actualBatchSize = 100 // Small batches
|
|
builderSize = 100 * 100 // 10KB
|
|
queriesCapacity = 100
|
|
case "high-compression":
|
|
actualBatchSize = 10000 // Very large batches - accumulate more data
|
|
builderSize = 10000 * 100 // 1MB
|
|
queriesCapacity = 10000
|
|
default: // "balanced"
|
|
actualBatchSize = 1000 // Medium batches
|
|
builderSize = 1000 * 100 // 100KB
|
|
queriesCapacity = 1000
|
|
}
|
|
|
|
sqlStringBuilderForFlush := strings.Builder{}
|
|
sqlStringBuilderForFlush.Grow(builderSize)
|
|
|
|
return &BinlogIndexer{
|
|
BatchSize: actualBatchSize,
|
|
binlogName: binlogFilename,
|
|
binlogPath: binlogPath,
|
|
compressionMode: compressionMode,
|
|
queries: make([]Query, 0, queriesCapacity),
|
|
currentRowId: 1,
|
|
estimatedMemory: 0,
|
|
db: db,
|
|
fw: parquetFile,
|
|
pw: parquetWriter,
|
|
parser: replication.NewBinlogParser(),
|
|
sqlParser: sql_parser,
|
|
sqlStringBuilderForFlush: sqlStringBuilderForFlush,
|
|
isClosed: false,
|
|
stringCache: make(map[string]string),
|
|
tableMapEvents: make([][]string, 0, 4), // Most queries affect 1-4 tables
|
|
}, nil
|
|
}
|
|
|
|
// internString deduplicates strings to reduce memory usage
|
|
// Database and table names are repeated frequently across events
|
|
func (p *BinlogIndexer) internString(s string) string {
|
|
if s == "" {
|
|
return ""
|
|
}
|
|
if interned, exists := p.stringCache[s]; exists {
|
|
return interned
|
|
}
|
|
p.stringCache[s] = s
|
|
return s
|
|
}
|
|
|
|
// bytesToString converts []byte to string efficiently
|
|
func (p *BinlogIndexer) bytesToString(b []byte) string {
|
|
return p.internString(string(b))
|
|
}
|
|
|
|
// detectQueryType quickly detects SQL statement type from prefix
|
|
// This is much faster than full SQL parsing - uses byte comparison
|
|
func detectQueryType(sql string) StatementType {
|
|
// Skip leading whitespace
|
|
i := 0
|
|
for i < len(sql) && (sql[i] == ' ' || sql[i] == '\t' || sql[i] == '\n' || sql[i] == '\r') {
|
|
i++
|
|
}
|
|
|
|
if i >= len(sql) {
|
|
return Other
|
|
}
|
|
|
|
// Fast byte-level comparison (case-insensitive)
|
|
// Check first character to quickly filter
|
|
firstChar := sql[i] | 0x20 // Convert to lowercase
|
|
switch firstChar {
|
|
case 's': // SELECT
|
|
if i+6 <= len(sql) {
|
|
word := sql[i : i+6]
|
|
if strings.EqualFold(word, "SELECT") {
|
|
return Select
|
|
}
|
|
}
|
|
case 'i': // INSERT
|
|
if i+6 <= len(sql) {
|
|
word := sql[i : i+6]
|
|
if strings.EqualFold(word, "INSERT") {
|
|
return Insert
|
|
}
|
|
}
|
|
case 'u': // UPDATE
|
|
if i+6 <= len(sql) {
|
|
word := sql[i : i+6]
|
|
if strings.EqualFold(word, "UPDATE") {
|
|
return Update
|
|
}
|
|
}
|
|
case 'd': // DELETE
|
|
if i+6 <= len(sql) {
|
|
word := sql[i : i+6]
|
|
if strings.EqualFold(word, "DELETE") {
|
|
return Delete
|
|
}
|
|
}
|
|
}
|
|
|
|
return Other
|
|
}
|
|
|
|
func (p *BinlogIndexer) Start() error {
|
|
err := p.parser.ParseFile(p.binlogPath, 0, p.onBinlogEvent)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse binlog: %w", err)
|
|
}
|
|
// Check for any pending annotate rows event
|
|
p.commitAnnotateRowsEvent()
|
|
// Flush the last batch
|
|
err = p.flush()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to flush: %w", err)
|
|
}
|
|
// Close everything
|
|
p.Close()
|
|
return nil
|
|
}
|
|
|
|
func (p *BinlogIndexer) onBinlogEvent(e *replication.BinlogEvent) error {
|
|
commitAnnotateRowsEvent := false
|
|
switch e.Header.EventType {
|
|
case replication.MARIADB_ANNOTATE_ROWS_EVENT:
|
|
if event, ok := e.Event.(*replication.MariadbAnnotateRowsEvent); ok {
|
|
p.annotateRowsEvent = event
|
|
p.annotateRowsEventSize = e.Header.EventSize
|
|
p.annotateRowsEventTimestamp = e.Header.Timestamp
|
|
}
|
|
case replication.TABLE_MAP_EVENT:
|
|
if event, ok := e.Event.(*replication.TableMapEvent); ok {
|
|
p.tableMapEvents = append(p.tableMapEvents, []string{
|
|
p.bytesToString(event.Schema),
|
|
p.bytesToString(event.Table),
|
|
})
|
|
p.annotateRowsEventSize += e.Header.EventSize
|
|
}
|
|
case replication.WRITE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv1:
|
|
if _, ok := e.Event.(*replication.RowsEvent); ok {
|
|
p.annotateRowsEventSize += e.Header.EventSize
|
|
}
|
|
case replication.QUERY_EVENT:
|
|
if event, ok := e.Event.(*replication.QueryEvent); ok {
|
|
sqlQuery := string(event.Query)
|
|
schema := p.bytesToString(event.Schema)
|
|
|
|
q := Query{
|
|
Timestamp: e.Header.Timestamp,
|
|
RowId: p.currentRowId,
|
|
EventSize: e.Header.EventSize,
|
|
SQL: sqlQuery,
|
|
}
|
|
|
|
queryType := detectQueryType(sqlQuery)
|
|
q.Metadata = *p.ExtractSQLMetadata(queryType, sqlQuery, schema)
|
|
|
|
p.estimatedMemory += int64(len(sqlQuery) + 32)
|
|
p.queries = append(p.queries, q)
|
|
p.currentRowId += 1
|
|
p.flushIfNeeded()
|
|
}
|
|
commitAnnotateRowsEvent = true
|
|
default:
|
|
commitAnnotateRowsEvent = true
|
|
}
|
|
|
|
if commitAnnotateRowsEvent {
|
|
p.commitAnnotateRowsEvent()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *BinlogIndexer) commitAnnotateRowsEvent() {
|
|
if p.annotateRowsEvent == nil {
|
|
return
|
|
}
|
|
|
|
if len(p.tableMapEvents) == 0 {
|
|
return
|
|
}
|
|
|
|
sqlQuery := string(p.annotateRowsEvent.Query)
|
|
// For annotate rows events, we already have table info from TABLE_MAP_EVENT
|
|
// Skip expensive SQL parsing and use the table map directly
|
|
metadata := SQLSourceMetadata{
|
|
Type: detectQueryType(sqlQuery), // Fast prefix detection, no parsing
|
|
Tables: make([]*SQLTable, 0, len(p.tableMapEvents)),
|
|
}
|
|
for _, table := range p.tableMapEvents {
|
|
metadata.Tables = append(metadata.Tables, &SQLTable{
|
|
Database: table[0],
|
|
Table: table[1],
|
|
})
|
|
}
|
|
|
|
query := Query{
|
|
Timestamp: p.annotateRowsEventTimestamp,
|
|
Metadata: metadata,
|
|
RowId: p.currentRowId,
|
|
EventSize: p.annotateRowsEventSize,
|
|
SQL: sqlQuery,
|
|
}
|
|
|
|
// Track memory usage
|
|
p.estimatedMemory += int64(len(sqlQuery) + 32) // SQL + struct overhead
|
|
p.queries = append(p.queries, query)
|
|
p.currentRowId += 1
|
|
|
|
// reset
|
|
p.annotateRowsEvent = nil
|
|
p.annotateRowsEventSize = 0
|
|
p.annotateRowsEventTimestamp = 0
|
|
p.tableMapEvents = p.tableMapEvents[:0]
|
|
|
|
p.flushIfNeeded()
|
|
}
|
|
|
|
func (p *BinlogIndexer) flushIfNeeded() {
|
|
// Memory threshold varies by compression mode
|
|
var maxMemoryBytes int64
|
|
switch p.compressionMode {
|
|
case "low-memory":
|
|
maxMemoryBytes = 2 * 1024 * 1024 // 2MB - very aggressive flushing
|
|
case "high-compression":
|
|
maxMemoryBytes = 100 * 1024 * 1024 // 100MB - accumulate more for better compression
|
|
default: // "balanced"
|
|
maxMemoryBytes = 10 * 1024 * 1024 // 10MB - moderate flushing
|
|
}
|
|
|
|
if len(p.queries) >= p.BatchSize || p.estimatedMemory > maxMemoryBytes {
|
|
err := p.flush()
|
|
if err != nil {
|
|
fmt.Printf("[WARN] failed to flush: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *BinlogIndexer) flush() error {
|
|
// Create transaction
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin db transaction: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
// In case of error rollback changes
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
// Write the queries to db using strings.Builder for efficiency
|
|
// Estimate size: each row is ~100 bytes on average
|
|
|
|
// Count total rows for pre-allocation
|
|
totalRows := 0
|
|
for i := range p.queries {
|
|
totalRows += len(p.queries[i].Metadata.Tables)
|
|
}
|
|
|
|
// Reset the string builder
|
|
p.sqlStringBuilderForFlush.Reset()
|
|
|
|
p.sqlStringBuilderForFlush.Grow(totalRows * 100) // Pre-allocate to avoid reallocations
|
|
|
|
first := true
|
|
if len(p.queries) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Write the query template at the start
|
|
p.sqlStringBuilderForFlush.WriteString(INSERT_QUERY_SQL)
|
|
|
|
// Build the values
|
|
for i := range p.queries {
|
|
query := &p.queries[i] // avoid copy
|
|
|
|
for j := range query.Metadata.Tables {
|
|
table := query.Metadata.Tables[j] // avoid copy
|
|
|
|
if !first {
|
|
p.sqlStringBuilderForFlush.WriteByte(',')
|
|
}
|
|
first = false
|
|
|
|
// Manually build the string to avoid fmt.Sprintf allocations
|
|
p.sqlStringBuilderForFlush.WriteString("('")
|
|
p.sqlStringBuilderForFlush.WriteString(p.binlogName)
|
|
p.sqlStringBuilderForFlush.WriteString("', '")
|
|
p.sqlStringBuilderForFlush.WriteString(table.Database)
|
|
p.sqlStringBuilderForFlush.WriteString("', '")
|
|
p.sqlStringBuilderForFlush.WriteString(table.Table)
|
|
p.sqlStringBuilderForFlush.WriteString("', ")
|
|
p.sqlStringBuilderForFlush.WriteString(itoa(int(query.Timestamp)))
|
|
p.sqlStringBuilderForFlush.WriteString(", '")
|
|
p.sqlStringBuilderForFlush.WriteString(string(query.Metadata.Type))
|
|
p.sqlStringBuilderForFlush.WriteString("', ")
|
|
p.sqlStringBuilderForFlush.WriteString(itoa(int(query.RowId)))
|
|
p.sqlStringBuilderForFlush.WriteString(", ")
|
|
p.sqlStringBuilderForFlush.WriteString(itoa(int(query.EventSize)))
|
|
p.sqlStringBuilderForFlush.WriteByte(')')
|
|
}
|
|
}
|
|
|
|
// Insert the queries
|
|
_, err = tx.Exec(p.sqlStringBuilderForFlush.String())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert queries: %w", err)
|
|
}
|
|
|
|
// Write to parquet file by bulk insert
|
|
if cap(p.parquetBuffer) < len(p.queries) {
|
|
p.parquetBuffer = make([]ParquetRow, len(p.queries))
|
|
} else {
|
|
p.parquetBuffer = p.parquetBuffer[:len(p.queries)]
|
|
}
|
|
|
|
// Fill buffer
|
|
for i := range p.queries {
|
|
p.parquetBuffer[i].Id = p.queries[i].RowId
|
|
p.parquetBuffer[i].Query = p.queries[i].SQL
|
|
}
|
|
|
|
// Batch write
|
|
if _, err = p.pw.Write(p.parquetBuffer); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Commit the transaction
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to commit db transaction: %w", err)
|
|
}
|
|
|
|
// Clear the queries - reuse slice capacity if reasonable
|
|
p.estimatedMemory = 0
|
|
p.queries = p.queries[:0]
|
|
|
|
// If slice grew too large, reset it to prevent holding too much memory
|
|
if cap(p.queries) > p.BatchSize*2 {
|
|
p.queries = make([]Query, 0, p.BatchSize)
|
|
}
|
|
|
|
// Clear string cache based on compression mode
|
|
switch p.compressionMode {
|
|
case "low-memory":
|
|
// Aggressive cache clearing for low-memory mode
|
|
if len(p.stringCache) > 1000 {
|
|
p.stringCache = make(map[string]string, 100)
|
|
}
|
|
// Force GC to release memory immediately - acceptable since speed is not priority
|
|
runtime.GC()
|
|
case "high-compression":
|
|
// Never clear cache in high-compression mode - maximize reuse
|
|
// No manual GC - let Go handle it naturally
|
|
default: // "balanced"
|
|
// More relaxed cache clearing for balanced mode
|
|
if len(p.stringCache) > 10000 {
|
|
p.stringCache = make(map[string]string, 1000)
|
|
}
|
|
// No manual GC in balanced mode - let Go handle it
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *BinlogIndexer) Close() {
|
|
if p.isClosed {
|
|
return
|
|
}
|
|
// Do a final flush
|
|
if err := p.flush(); err != nil {
|
|
fmt.Printf("[WARN] failed to flush: %v\n", err)
|
|
}
|
|
// try to stop the parquet writer
|
|
if err := p.pw.Close(); err != nil {
|
|
fmt.Printf("[WARN] failed to stop parquet writer: %v\n", err)
|
|
}
|
|
|
|
// try to close the parquet file
|
|
if err := p.fw.Close(); err != nil {
|
|
fmt.Printf("[WARN] failed to close parquet file: %v\n", err)
|
|
}
|
|
// try to close the db
|
|
if err := p.db.Close(); err != nil {
|
|
fmt.Printf("[WARN] failed to close db: %v\n", err)
|
|
}
|
|
|
|
// Free memory
|
|
p.queries = nil
|
|
p.tableMapEvents = nil
|
|
p.stringCache = nil
|
|
|
|
p.isClosed = true
|
|
}
|
|
|
|
//export RemoveBinlogIndex
|
|
func RemoveBinlogIndex(basePath string, binlogPath string, databaseFilename string) error {
|
|
binlogFilename := filepath.Base(binlogPath)
|
|
|
|
db, err := sql.Open("duckdb", filepath.Join(basePath, databaseFilename))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
_ = db.Close()
|
|
}()
|
|
|
|
// drop binlog
|
|
_, err = db.Exec("DELETE FROM query WHERE binlog = ?", binlogFilename)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete binlog data: %w", err)
|
|
}
|
|
|
|
// drop parquet file
|
|
parquetFilepath := filepath.Join(basePath, fmt.Sprintf("queries_%s.parquet", binlogFilename))
|
|
_ = os.Remove(parquetFilepath)
|
|
return nil
|
|
}
|
|
|
|
// itoa is a faster integer to string conversion for SQL building
|
|
func itoa(i int) string {
|
|
return strconv.Itoa(i)
|
|
}
|