.. Automatically generated by example2rst.py. Do not edit this file .. currentmodule:: apsw Example/Tour ============ This code demonstrates usage of APSW. It gives you a good overview of all the things that can be done. Also included is output so you can see what gets printed when you run the code. .. code-block:: python #!/usr/bin/env python3 from __future__ import annotations import os import sys import time import apsw import random # Note: this code uses Python's optional typing annotations. You can # ignore them and do not need to use them from typing import Optional, Iterator, Tuple .. index:: Checking APSW and SQLite versions (example code) .. _example_version_check: Checking APSW and SQLite versions --------------------------------- .. code-block:: python # Where the extension module is on the filesystem print(" Using APSW file", apsw.__file__) # From the extension print(" APSW version", apsw.apswversion()) # From the sqlite header file at APSW compile time print("SQLite header version", apsw.SQLITE_VERSION_NUMBER) # The SQLite code running print(" SQLite lib version", apsw.sqlitelibversion()) # If True then SQLite is incorporated into the extension. # If False then a shared library is being used, or static linking print(" Using amalgamation", apsw.using_amalgamation) .. code-block:: output Using APSW file /space/apsw/./apsw/__init__.cpython-310-x86_64-linux-gnu.so APSW version 3.40.0.0 SQLite header version 3040000 SQLite lib version 3.40.0 Using amalgamation True .. index:: Opening the database (example code) .. _example_open_db: Opening the database -------------------- You open the database by using :class:`Connection` .. code-block:: python # Default will create the database if it doesn't exist connection = apsw.Connection("dbfile") # Open existing read-only connection = apsw.Connection("dbfile", flags=apsw.SQLITE_OPEN_READONLY) # Open existing read-write (exception if it doesn't exist) connection = apsw.Connection("dbfile", flags=apsw.SQLITE_OPEN_READWRITE) .. index:: Executing SQL (example code) .. _example_executing_sql: Executing SQL ------------- Use :meth:`Connection.execute` to execute SQL .. code-block:: python connection.execute("create table point(x,y,z)") connection.execute("insert into point values(1, 2, 3)") # You can use multiple ; separated statements connection.execute(""" insert into point values(4, 5, 6); create table log(timestamp, event); create table foo(a, b, c); create table important(secret, data); """) # read rows for row in connection.execute("select * from point"): print(row) .. code-block:: output (1, 2, 3) (4, 5, 6) .. index:: Why you use bindings to provide values (example code) .. _example_why_bindings: Why you use bindings to provide values -------------------------------------- It is tempting to compose strings with the values in them, but it is easy to mangle the query especially if values contain punctuation and unicode. It is known as `SQL injection `__. Bindings are the correct way to supply values to queries. .. code-block:: python # a simple value event = "system started" # DO NOT DO THIS query = "insert into log values(0, '" + event + "')" print("query:", query) # BECAUSE ... a bad guy could provide a value like this event = "bad guy here') ; drop table important; -- " # which has effects like this query = "insert into log values(0, '" + event + "')" print("bad guy:", query) .. code-block:: output query: insert into log values(0, 'system started') bad guy: insert into log values(0, 'bad guy here') ; drop table important; -- ') .. index:: Bindings (sequence) (example code) .. _example_bindings_sequence: Bindings (sequence) ------------------- Bindings can be provided as a sequence such as with a tuple or list. Use **?** to show where the values go. .. code-block:: python query = "insert into log values(?, ?)" data = (7, "restart") connection.execute(query, data) # You can also use numbers after the ? to select # values from the sequence. Note that numbering # starts at 1 query = "select ?1, ?3, ?2" data = ("alpha", "beta", "gamma") for row in connection.execute(query, data): print(row) .. code-block:: output ('alpha', 'gamma', 'beta') .. index:: Bindings (dict) (example code) .. _example_bindings_dict: Bindings (dict) --------------- You can also supply bindings with a dictionary. Use **:NAME**, **@NAME**, or **$NAME**, to provide the key name in the query. Names are case sensitive. .. code-block:: python query = "insert into point values(:x, @Y, $z)" data = {"x": 7, "Y": 8, "z": 9} connection.execute(query, data) .. index:: Using different types (example code) .. _example_types: Using different types --------------------- SQLite supports None, int, float, str, bytes (binary data). If a table declaration gives a type then SQLite attempts conversion. `Read more `__. .. code-block:: python connection.execute(""" create table types1(a, b, c, d, e); create table types2(a INTEGER, b REAL, c TEXT, d, e BLOB); """) data = ("12", 3, 4, 5.5, b"\x03\x72\xf4\x00\x9e") connection.execute("insert into types1 values(?,?,?,?,?)", data) connection.execute("insert into types2 values(?,?,?,?,?)", data) for row in connection.execute("select * from types1"): print("types1", repr(row)) for row in connection.execute("select * from types2"): print("types2", repr(row)) .. code-block:: output types1 ('12', 3, 4, 5.5, b'\x03r\xf4\x00\x9e') types2 (12, 3.0, '4', 5.5, b'\x03r\xf4\x00\x9e') .. index:: Transactions (example code) .. _example_transaction: Transactions ------------ By default each statement is its own transaction (3 in the example below). A transaction finishes by flushing data to storage and waiting for the operating system to confirm it is permanently there (ie will survive a power failure) which takes a while. .. code-block:: python connection.execute("insert into point values(2, 2, 2)") connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") # You can use BEGIN / END to manually make a transaction connection.execute("BEGIN") connection.execute("insert into point values(2, 2, 2)") connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") connection.execute("END") # Or use `with`` that does it automatically with connection: connection.execute("insert into point values(2, 2, 2)") connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") # Nested transactions are supported with connection: connection.execute("insert into point values(2, 2, 2)") with connection: connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") .. index:: executemany (example code) .. _example_executemany: executemany ----------- You can execute the same SQL against a sequence using :meth:`Connection.executemany` .. code-block:: python data = ( (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), ) query = "insert into point values(?,?,?)" # we do it in a transaction with connection: # the query is run for each item in data connection.executemany(query, data) .. index:: Tracing execution (example code) .. _example_exectrace: Tracing execution ----------------- You can trace execution of SQL statements. See :ref:`more about tracing `. .. code-block:: python def my_tracer(cursor: apsw.Cursor, statement: str, bindings: Optional[apsw.Bindings]) -> bool: "Called just before executing each statement" print("SQL:", statement.strip()) print("Bindings:", bindings) return True # if you return False then execution is aborted # you can trace a single cursor cursor = connection.cursor() cursor.exectrace = my_tracer cursor.execute( """ drop table if exists bar; create table bar(x,y,z); select * from point where x=?; """, (3, )) # if set on a connection then all cursors are traced connection.exectrace = my_tracer # and clearing it connection.exectrace = None .. code-block:: output SQL: drop table if exists bar; Bindings: () SQL: create table bar(x,y,z); Bindings: () SQL: select * from point where x=?; Bindings: (3,) .. index:: Tracing returned rows (example code) .. _example_rowtrace: Tracing returned rows --------------------- You can trace returned rows, including modifying what is returned or skipping it completely. See :ref:`more about tracing `. .. code-block:: python def row_tracer(cursor: apsw.Cursor, row: apsw.SQLiteValues) -> apsw.SQLiteValues: """Called with each row of results before they are handed off. You can return None to cause the row to be skipped or a different set of values to return""" print("Row:", row) return row # you can trace a single cursor cursor = connection.cursor() cursor.rowtrace = row_tracer for row in cursor.execute("select x,y from point where x>4"): pass # if set on a connection then all cursors are traced connection.rowtrace = row_tracer # and clearing it connection.rowtrace = None .. code-block:: output Row: (7, 8) Row: (5, 5) .. index:: Defining your own functions (example code) .. _example_scalar: Defining your own functions --------------------------- Scalar functions take one or more values and return one value. They are registered by calling :meth:`Connection.createscalarfunction`. .. code-block:: python def ilove7(*args: apsw.SQLiteValue) -> int: "A scalar function" print(f"ilove7 got { args } but I love 7") return 7 connection.createscalarfunction("seven", ilove7) for row in connection.execute("select seven(x,y) from point where x>4"): print("row", row) .. code-block:: output ilove7 got (7, 8) but I love 7 row (7,) ilove7 got (5, 5) but I love 7 row (7,) .. index:: Defining aggregate functions (example code) .. _example_aggregate: Defining aggregate functions ---------------------------- Aggregate functions are called multiple times with matching rows, and then provide a final value. An example is calculating an average. They are registered by calling :meth:`Connection.createaggregatefunction`. .. code-block:: python class longest: # Find which value when represented as a string is # the longest def __init__(self) -> None: self.longest = "" def step(self, *args: apsw.SQLiteValue) -> None: # Called with each matching row for arg in args: if len(str(arg)) > len(self.longest): self.longest = str(arg) def final(self) -> str: # Called at the very end return self.longest @classmethod def factory(cls) -> apsw.AggregateCallbacks: return cls(), cls.step, cls.final connection.createaggregatefunction("longest", longest.factory) for row in connection.execute("select longest(event) from log"): print(row) .. code-block:: output ('restart',) .. index:: Defining collations (sorting) (example code) .. _example_collation: Defining collations (sorting) ----------------------------- How you sort can depend on the languages or values involved. You register a collation by calling :meth:`Connection.createcollation`. .. code-block:: python # This example sorting mechanisms understands some text followed by a # number and ensures the number portion gets sorted correctly connection.execute("create table names(name)") connection.executemany("insert into names values(?)", ( ("file1", ), ("file7", ), ("file17", ), ("file20", ), ("file3", ), )) print("Standard sorting") for row in connection.execute("select * from names order by name"): print(row) def str_num_collate(s1: apsw.SQLiteValue, s2: apsw.SQLiteValue) -> int: # return -1 if s1s2 else 0 def parts(v: str) -> tuple[str, int]: num = "" while v and v[-1].isdigit(): num = v[-1] + num v = v[:-1] return v, int(num) if num else 0 ps1 = parts(str(s1)) ps2 = parts(str(s2)) # compare if ps1 < ps2: return -1 if ps1 > ps2: return 1 return 0 connection.createcollation("strnum", str_num_collate) print() print("Using strnum") for row in connection.execute("select * from names order by name collate strnum"): print(row) .. code-block:: output Standard sorting ('file1',) ('file17',) ('file20',) ('file3',) ('file7',) Using strnum ('file1',) ('file3',) ('file7',) ('file17',) ('file20',) .. index:: Accessing results by column name (example code) .. _example_colnames: Accessing results by column name -------------------------------- You can access results by column name using :mod:`dataclasses`. APSW provides :class:`apsw.ext.DataClassRowFactory` for names instead .. code-block:: python import apsw.ext connection.execute(""" create table books(id, title, author, year); insert into books values(7, "Animal Farm", "George Orwell", 1945); insert into books values(37, "The Picture of Dorian Gray", "Oscar Wilde", 1890); """) # Normally you use column numbers for row in connection.execute("select title, id, year from books where author=?", ("Oscar Wilde", )): # this is very fragile print("title", row[0]) print("id", row[1]) print("year", row[2]) # Turn on dataclasses - frozen makes them read-only connection.rowtrace = apsw.ext.DataClassRowFactory(dataclass_kwargs={"frozen": True}) print("\nNow with dataclasses\n") # Same query - note using AS to set column name for row in connection.execute( """SELECT title, id AS book_id, year AS book_year FROM books WHERE author = ?""", ("Oscar Wilde", )): print("title", row.title) print("id", row.book_id) print("year", row.book_year) # clear connection.rowtrace = None .. code-block:: output title The Picture of Dorian Gray id 37 year 1890 Now with dataclasses title The Picture of Dorian Gray id 37 year 1890 .. index:: Type conversion into/out of database (example code) .. _example_type_conversion: Type conversion into/out of database ------------------------------------ You can use :class:`apsw.ext.TypesConverterCursorFactory` to do conversion, both for types you define and for other types. .. code-block:: python import apsw.ext registrar = apsw.ext.TypesConverterCursorFactory() connection.cursor_factory = registrar # A type we define - deriving from SQLiteTypeAdapter automatically registers conversion # to a SQLite value class Point(apsw.ext.SQLiteTypeAdapter): def __init__(self, x, y): self.x = x self.y = y def __repr__(self) -> str: return f"Point({ self.x }, { self.y })" def __eq__(self, other: Point) -> bool: return isinstance(other, Point) and self.x == other.x and self.y == other.y def to_sqlite_value(self) -> str: # called to convert Point into something SQLite supports return f"{ self.x };{ self.y }" # This converter will be registered @staticmethod def convert_from_sqlite(value: str) -> Point: return Point(*(float(part) for part in value.split(";"))) # An existing type def complex_to_sqlite_value(c: complex) -> str: return f"{ c.real }+{ c.imag }" # ... requires manual registration registrar.register_adapter(complex, complex_to_sqlite_value) # conversion from a SQLite value requires registration registrar.register_converter("POINT", Point.convert_from_sqlite) # ... and for complex def sqlite_to_complex(v: str) -> complex: return complex(*(float(part) for part in v.split("+"))) registrar.register_converter("COMPLEX", sqlite_to_complex) # note that the type names are case sensitive and must match the # registration connection.execute("create table conversion(p POINT, c COMPLEX)") # convert going into database test_data = (Point(5.2, 7.6), 3 + 4j) connection.execute("insert into conversion values(?, ?)", test_data) print("inserted", test_data) # and coming back out for row in connection.execute("select * from conversion"): print("back out", row) print("equal", row == test_data) # clear registrar connection.cursor_factory = apsw.Cursor .. code-block:: output inserted (Point(5.2, 7.6), (3+4j)) back out (Point(5.2, 7.6), (3+4j)) equal True .. index:: Query details (example code) .. _example_query_details: Query details ------------- :meth:`apsw.ext.query_info` can provide a lot of information about a query (without running it) .. code-block:: python import apsw.ext # test tables connection.execute(""" create table customers( id INTEGER PRIMARY KEY, name CHAR, address CHAR); create table orders( id INTEGER PRIMARY KEY, customer_id INTEGER, item MY_OWN_TYPE); create index cust_addr on customers(address); """) query = """ SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = ?; SELECT 7;""" bindings = ("123 Main Street", ) # ask for all information available qd = apsw.ext.query_info( connection, query, bindings=bindings, actions=True, # which tables/views etc and how they are accessed expanded_sql=True, # expands bindings into query string explain=True, # shows low level VDBE explain_query_plan=True, # how SQLite solves the query ) # help with formatting import pprint print("query", qd.query) print("\nbindings", qd.bindings) print("\nexpanded_sql", qd.expanded_sql) print("\nfirst_query", qd.first_query) print("\nquery_remaining", qd.query_remaining) print("\nis_explain", qd.is_explain) print("\nis_readonly", qd.is_readonly) print("\ndescription\n", pprint.pformat(qd.description)) if hasattr(qd, "description_full"): print("\ndescription_full\n", pprint.pformat(qd.description_full)) print("\nquery_plan\n", pprint.pformat(qd.query_plan)) print("\nFirst 5 actions\n", pprint.pformat(qd.actions[:5])) print("\nFirst 5 explain\n", pprint.pformat(qd.explain[:5])) .. code-block:: output query SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = ?; SELECT 7; bindings ('123 Main Street',) expanded_sql SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = '123 Main Street'; first_query SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = ?; query_remaining SELECT 7; is_explain 0 is_readonly True description (('id', 'INTEGER'), ('customer_id', 'INTEGER'), ('item', 'MY_OWN_TYPE'), ('id', 'INTEGER'), ('name', 'CHAR'), ('address', 'CHAR')) description_full (('id', 'INTEGER', 'main', 'orders', 'id'), ('customer_id', 'INTEGER', 'main', 'orders', 'customer_id'), ('item', 'MY_OWN_TYPE', 'main', 'orders', 'item'), ('id', 'INTEGER', 'main', 'customers', 'id'), ('name', 'CHAR', 'main', 'customers', 'name'), ('address', 'CHAR', 'main', 'customers', 'address')) query_plan QueryPlan(detail='QUERY PLAN', sub=[QueryPlan(detail='SCAN orders', sub=None), QueryPlan(detail='SEARCH customers USING INTEGER PRIMARY KEY ' '(rowid=?)', sub=None)]) First 5 actions [QueryAction(action=21, action_name='SQLITE_SELECT', column_name=None, database_name=None, file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name=None, trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='id', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='orders', trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='customer_id', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='orders', trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='item', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='orders', trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='id', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='customers', trigger_name=None, trigger_or_view=None, view_name=None)] First 5 explain [VDBEInstruction(addr=0, opcode='Init', comment=None, p1=0, p2=17, p3=0, p4=None, p5=0), VDBEInstruction(addr=1, opcode='OpenRead', comment=None, p1=0, p2=13, p3=0, p4='3', p5=0), VDBEInstruction(addr=2, opcode='OpenRead', comment=None, p1=1, p2=12, p3=0, p4='3', p5=0), VDBEInstruction(addr=3, opcode='Rewind', comment=None, p1=0, p2=16, p3=0, p4=None, p5=0), VDBEInstruction(addr=4, opcode='Column', comment=None, p1=0, p2=1, p3=1, p4=None, p5=0)] .. index:: Blob I/O (example code) .. _example_blob_io: Blob I/O -------- BLOBS (binary large objects) are supported by SQLite. Note that you cannot change the size of one, but you can allocate one filled with zeroes, and then later open it and read / write the contents similar to a file, without having the entire blob in memory. Use :meth:`Connection.blobopen` to open a blob. .. code-block:: python connection.execute("create table blobby(x,y)") # Add a blob we will fill in later connection.execute("insert into blobby values(1, zeroblob(10000))") # Or as a binding connection.execute("insert into blobby values(2, ?)", (apsw.zeroblob(20000), )) # Open a blob for writing. We need to know the rowid rowid = connection.execute("select ROWID from blobby where x=1").fetchall()[0][0] blob = connection.blobopen("main", "blobby", "y", rowid, True) blob.write(b"hello world") blob.seek(2000) blob.write(b"hello world, again") blob.close() .. index:: Authorizer (control what SQL can do) (example code) .. _example_authorizer: Authorizer (control what SQL can do) ------------------------------------ You can allow, deny, or ignore what SQL does. Use :attr:`Connection.authorizer` to set an authorizer. .. code-block:: python def auth(operation: int, p1: Optional[str], p2: Optional[str], db_name: Optional[str], trigger_or_view: Optional[str]) -> int: """Called when each operation is prepared. We can return SQLITE_OK, SQLITE_DENY or SQLITE_IGNORE""" # find the operation name print(apsw.mapping_authorizer_function[operation], p1, p2, db_name, trigger_or_view) if operation == apsw.SQLITE_CREATE_TABLE and p1 and p1.startswith("private"): return apsw.SQLITE_DENY # not allowed to create tables whose names start with private return apsw.SQLITE_OK # always allow connection.authorizer = auth connection.execute("insert into names values('foo')") connection.execute("select name from names limit 1") try: connection.execute("create table private_stuff(secret)") print("Created secret table!") except Exception as e: print(e) # Clear authorizer connection.authorizer = None .. code-block:: output SQLITE_INSERT names None main None SQLITE_SELECT None None None None SQLITE_READ names name main None SQLITE_INSERT sqlite_master None main None SQLITE_CREATE_TABLE private_stuff None main None AuthError: not authorized .. index:: Progress handler (example code) .. _example_progress_handler: Progress handler ---------------- Some operations (eg joins, sorting) can take many operations to complete. Register a progress handler callback with :meth:`Connection.setprogresshandler` which lets you provide feedback and allows cancelling. .. code-block:: python def some_numbers(how_many: int) -> Iterator[Tuple[int]]: for _ in range(how_many): yield (random.randint(0, 9999999999), ) # create a table with random numbers with connection: connection.execute("create table numbers(x)") connection.executemany("insert into numbers values(?)", some_numbers(100)) def progress_handler() -> bool: print("progress handler called") return False # returning True aborts # register handler every 50 vdbe instructions connection.setprogresshandler(progress_handler, 50) # Sorting the numbers to find the biggest for max_num in connection.execute("select max(x) from numbers"): print(max_num) # Clear handler connection.setprogresshandler(None) .. code-block:: output progress handler called progress handler called progress handler called progress handler called progress handler called progress handler called progress handler called progress handler called (9996980943,) .. index:: Commit hook (example code) .. _example_commit_hook: Commit hook ----------- A commit hook can allow or veto commits. Register a commit hook with :meth:`Connection.setcommithook`. .. code-block:: python def my_commit_hook() -> bool: print("in commit hook") hour = time.localtime()[3] if hour < 8 or hour > 17: print("no commits out of hours") return True # abort commits outside of 8am through 6pm print("commits okay at this time") return False # let commit go ahead connection.setcommithook(my_commit_hook) try: with connection: connection.execute("create table example(x,y,z); insert into example values (3,4,5)") except apsw.ConstraintError: print("commit was not allowed") connection.setcommithook(None) .. code-block:: output in commit hook no commits out of hours commit was not allowed .. index:: Update hook (example code) .. _example_update_hook: Update hook ----------- Update hooks let you know that data has been added, changed, or removed. For example you could use this to discard cached information. Register a hook using :meth:`Connection.setupdatehook`. .. code-block:: python def my_update_hook(type: int, db_name: str, table_name: str, rowid: int) -> None: op: str = apsw.mapping_authorizer_function[type] print(f"Updated: { op } db { db_name }, table { table_name }, rowid { rowid }") connection.setupdatehook(my_update_hook) connection.execute("insert into names values(?)", ("file93", )) connection.execute("update names set name=? where name=?", ("file94", "file93")) connection.execute("delete from names where name=?", ("file94", )) # Clear the hook connection.setupdatehook(None) .. code-block:: output Updated: SQLITE_INSERT db main, table names, rowid 7 Updated: SQLITE_UPDATE db main, table names, rowid 7 Updated: SQLITE_DELETE db main, table names, rowid 7 .. index:: Virtual tables (example code) .. _example_virtual_tables: Virtual tables -------------- Virtual tables let you provide data on demand as a SQLite table so you can use SQL queries against that data. :ref:`Read more about virtual tables `. .. code-block:: python # This example provides information about all the files in Python's # path. The minimum amount of code needed is shown, and lets SQLite # do all the heavy lifting. A more advanced table would use indices # and filters to reduce the number of rows shown to SQLite. # these first columns are used by our virtual table vtcolumns = ["rowid", "name", "directory"] def get_file_data(directories): "Returns a list of column names, and a list of all the files with their attributes" columns = None data = [] counter = 1 for directory in directories: for f in os.listdir(directory): if not os.path.isfile(os.path.join(directory, f)): continue counter += 1 st = os.stat(os.path.join(directory, f)) if columns is None: # we add on all the fields from os.stat columns = vtcolumns + [x for x in dir(st) if x.startswith("st_")] data.append([counter, f, directory] + [getattr(st, x) for x in columns[3:]]) return columns, data # This gets registered with the Connection class Source: def Create(self, db, modulename, dbname, tablename, *args): # the eval strips off layer of quotes columns, data = get_file_data([eval(a.replace("\\", "\\\\")) for a in args]) schema = "create table foo(" + ','.join(["'%s'" % (x, ) for x in columns[1:]]) + ")" return schema, Table(columns, data) Connect = Create # Represents a table class Table: def __init__(self, columns, data): self.columns = columns self.data = data def BestIndex(self, *args): return None def Open(self): return Cursor(self) def Disconnect(self): pass Destroy = Disconnect # Represents a cursor used during SQL query processing class Cursor: def __init__(self, table): self.table = table def Filter(self, *args): self.pos = 0 def Eof(self): return self.pos >= len(self.table.data) def Rowid(self): return self.table.data[self.pos][0] def Column(self, col): return self.table.data[self.pos][1 + col] def Next(self): self.pos += 1 def Close(self): pass # Register the module as filesource connection.createmodule("filesource", Source()) # Arguments to module - all directories in sys.path sysdirs = ",".join(["'%s'" % (x, ) for x in sys.path[1:] if len(x) and os.path.isdir(x)]) connection.execute("create virtual table sysfiles using filesource(" + sysdirs + ")") print("3 biggest files") for size, directory, file in connection.execute( "select st_size,directory,name from sysfiles order by st_size desc limit 3"): print(size, file, directory) print() print("3 oldest files") for ctime, directory, file in connection.execute( "select st_ctime,directory,name from sysfiles order by st_ctime limit 3"): print(ctime, file, directory) .. code-block:: output 3 biggest files 546696 _ctypes.cpython-310d-x86_64-linux-gnu.so /usr/lib/python3.10/lib-dynload 511328 _ssl.cpython-310d-x86_64-linux-gnu.so /usr/lib/python3.10/lib-dynload 450008 _testcapi.cpython-310d-x86_64-linux-gnu.so /usr/lib/python3.10/lib-dynload 3 oldest files 1641597423.5541885 sitecustomize.py /usr/lib/python3.10 1664920933.397524 _gdbm.cpython-310-x86_64-linux-gnu.so /usr/lib/python3.10/lib-dynload 1664921015.6046188 _tkinter.cpython-310-x86_64-linux-gnu.so /usr/lib/python3.10/lib-dynload .. index:: VFS - Virtual File System (example code) .. _example_vfs: VFS - Virtual File System ------------------------- VFS lets you control access to the filesystem from SQLite. APSW makes it easy to "inherit" from an existing VFS and monitor or alter data as it flows through. Read more about :ref:`VFS `. .. code-block:: python # This example VFS "obfuscates" the database file contents by xor all # bytes with 0xa5. URI parameters are also shown as a way you can # pass additional information for files. def obfuscate(data): if not data: return data return bytes([x ^ 0xa5 for x in data]) # Inheriting from a base of "" means the default vfs class ObfuscatedVFS(apsw.VFS): def __init__(self, vfsname="obfuscated", basevfs=""): self.vfs_name = vfsname self.base_vfs = basevfs apsw.VFS.__init__(self, self.vfs_name, self.base_vfs) # We want to return our own file implementation, but also # want it to inherit def xOpen(self, name, flags: int): if isinstance(name, apsw.URIFilename): print("xOpen of", name.filename()) # We can look at uri parameters print("fast is", name.uri_parameter("fast")) print("level is", name.uri_int("level", 3)) print("warp is", name.uri_boolean("warp", False)) print("notpresent is", name.uri_parameter("notpresent")) else: print("xOpen of", name) return ObfuscatedVFSFile(self.base_vfs, name, flags) # The file implementation where we override xRead and xWrite to call our # encryption routine class ObfuscatedVFSFile(apsw.VFSFile): def __init__(self, inheritfromvfsname, filename, flags): apsw.VFSFile.__init__(self, inheritfromvfsname, filename, flags) def xRead(self, amount, offset): return obfuscate(super().xRead(amount, offset)) def xWrite(self, data, offset): super().xWrite(obfuscate(data), offset) # To register the VFS we just instantiate it obfuvfs = ObfuscatedVFS() # Lets see what vfs are now available? print("VFS available", apsw.vfsnames()) # Make an obfuscated db, passing in some URI parameters # default open flags open_flags = apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE # add in using URI parameters open_flags |= apsw.SQLITE_OPEN_URI obfudb = apsw.Connection("file:myobfudb?fast=speed&level=7&warp=on&another=true", flags=open_flags, vfs=obfuvfs.vfs_name) # Check it works obfudb.execute("create table foo(x,y); insert into foo values(1,2)") # Check it really is obfuscated on disk print("What is on disk", repr(open("myobfudb", "rb").read()[:20])) # And unobfuscating it print("Unobfuscated disk", repr(obfuscate(open("myobfudb", "rb").read()[:20]))) # Tidy up obfudb.close() os.remove("myobfudb") .. code-block:: output VFS available ['unix', 'obfuscated', 'memdb', 'unix-excl', 'unix-dotfile', 'unix-none'] xOpen of /space/apsw/myobfudb fast is speed level is 7 warp is True notpresent is None xOpen of /space/apsw/myobfudb-journal xOpen of /space/apsw/myobfudb-journal What is on disk b'\xf6\xf4\xe9\xcc\xd1\xc0\x85\xc3\xca\xd7\xc8\xc4\xd1\x85\x96\xa5\xb5\xa5\xa4\xa4' Unobfuscated disk b'SQLite format 3\x00\x10\x00\x01\x01' .. index:: Limits (example code) .. _example_limits: Limits ------ SQLite lets you see and update various limits via :meth:`Connection.limit` .. code-block:: python # Print some limits for limit in ("LENGTH", "COLUMN", "ATTACHED"): name = "SQLITE_LIMIT_" + limit max_name = "SQLITE_MAX_" + limit # compile time limit orig = connection.limit(getattr(apsw, name)) print(name, orig) # To get the maximum, set to 0x7fffffff and then read value back connection.limit(getattr(apsw, name), 0x7fffffff) max = connection.limit(getattr(apsw, name)) print(max_name, " ", max) # Set limit for size of a string connection.execute("create table testlimit(s)") connection.execute("insert into testlimit values(?)", ("x" * 1024, )) # 1024 char string connection.limit(apsw.SQLITE_LIMIT_LENGTH, 1023) # limit is now 1023 try: connection.execute("insert into testlimit values(?)", ("y" * 1024, )) print("string exceeding limit was inserted") except apsw.TooBigError: print("Caught toobig exception") # reset back to largest value connection.limit(apsw.SQLITE_LIMIT_LENGTH, 0x7fffffff) .. code-block:: output SQLITE_LIMIT_LENGTH 1000000000 SQLITE_MAX_LENGTH 1000000000 SQLITE_LIMIT_COLUMN 2000 SQLITE_MAX_COLUMN 2000 SQLITE_LIMIT_ATTACHED 125 SQLITE_MAX_ATTACHED 125 Caught toobig exception .. index:: Backup an open database (example code) .. _example_backup: Backup an open database ----------------------- You can backup a database that is open. The pages are copied in batches of your choosing and allow continued use of the database. :ref:`Read more `. .. code-block:: python # We will copy a disk database into a memory database memcon = apsw.Connection(":memory:") # Copy into memory with memcon.backup("main", connection, "main") as backup: backup.step(10) # copy 10 pages in each batch .. index:: Shell (example code) .. _example_shell: Shell ----- APSW includes a :ref:`shell ` like the one in `SQLite `__, and is also extensible from Python. .. code-block:: python import apsw.shell # Here we use the shell to do a csv export and then dump part of the # database # Export to a StringIO import io output = io.StringIO() shell = apsw.shell.Shell(stdout=output, db=connection) # How to execute a dot command shell.process_command(".mode csv") shell.process_command(".headers on") # How to execute SQL shell.process_sql(""" create table csvtest(column1, column2 INTEGER); create index faster on csvtest(column1); insert into csvtest values(3, 4); insert into csvtest values('a b', NULL); """) # Or let the shell figure out SQL vs dot command shell.process_complete_line("select * from csvtest") # see the result print(output.getvalue()) # reset output output.seek(0) # make a dump of the same table shell.process_command(".dump csvtest%") # see the result print("\nDump output\n") print(output.getvalue()) .. code-block:: output column1,column2 3,4 a b, Dump output -- SQLite dump (by APSW 3.40.0.0) -- SQLite version 3.40.0 -- Date: Sun Nov 27 07:17:16 2022 -- Tables like: csvtest% -- Database: /space/apsw/dbfile -- User: rogerb @ clamps -- The values of various per-database settings PRAGMA page_size=4096; -- PRAGMA encoding='UTF-8'; -- PRAGMA auto_vacuum=NONE; -- PRAGMA max_page_count=1073741823; BEGIN TRANSACTION; -- Table csvtest DROP TABLE IF EXISTS csvtest; CREATE TABLE csvtest(column1, column2 INTEGER); INSERT INTO csvtest VALUES(3,4); INSERT INTO csvtest VALUES('a b',NULL); -- Triggers and indices on csvtest CREATE INDEX faster on csvtest(column1); COMMIT TRANSACTION; .. index:: Statistics (example code) .. _example_status: Statistics ---------- SQLite provides statistics by :meth:`status` .. code-block:: python current_usage, max_usage = apsw.status(apsw.SQLITE_STATUS_MEMORY_USED) print(f"SQLite memory usage { current_usage } max { max_usage }") .. code-block:: output SQLite memory usage 298832 max 345560 .. index:: Cleanup (example code) .. _example_cleanup: Cleanup ------- As a general rule you do not need to do any cleanup. Standard Python garbage collection will take of everything. Even if the process crashes with a connection in the middle of a transaction, the next time SQLite opens that database it will automatically rollback the partial data. .. code-block:: python # You close connections manually (useful if you want to catch exceptions) connection.close() # You can call close multiple times, and also indicate to ignore exceptions connection.close(True) # Deleting the database file. Note that there can be additional files # with suffixes like -wal, -shm, and -journal. os.remove("dbfile")