NIO Aplikační programování v Javě (BI-APJ) - 12 Ing. Jiří Daněček Katedra softwarového inženýrství Fakulta informačních technologií ČVUT Praha
Evropský sociální fond Praha & EU: Investujeme do vaší budoucnosti
Úvod ● Knihovna NIO (java.nio) byla zaveden do JDK od verze 1.4 jako nadstavba standardní knihovny java.io. ● Jejím hlavním účelem je zprostředkovat aplikacím přímý přístup k rychlým nativním I/O operacím, které poskytuje operační systém. ● Hlavní vlastnosti NIO: ○ vysoká rychlost ○ využití blokových přenosů ○ neblokovaný vstup a výstup ○ možnost využití DMA ○ využití SSL
Abstraktní třída Buffer ● Buffer je konečná vyrovnávací paměť slotů na elementy primitivního typu. Buffer pracuje buď v režimu čtení nebo zápisu. Buffer má následující vlastnosti: ○ capacity - fixní počet slotů, ○ position - index první pozice, která se bude dále zpracovávat. V režimu čtení je to první element, který se bude číst, v režimu zápisu je to první volný slot. ○ limit - index první pozice, která se naopak nebude zracovávat. V režimu čtení z bufferu je to první volný slot, v režimu zápisu je to capacity. ● Pro buffer platí invariant: 0 <= mark <= position <= limit <= capacity ● Pro každý primitivní typ je definována podtřída: ByteBuffer, IntBuffer, ... atd.
Základní operace nad Buffer ● Do bufferu se v jednom okamžiku buď zapisuje nebo se z něj čte. ● public final Buffer flip() - překlopení bufferu z režimu zápis do režimu čtení: ○ limit je nastaven na position ○ position je nastavena na 0. ● public final Buffer clear() - vymazání bufferu. ○ position je nastavena na 0, ○ limit je nastaven na capacity. ● public final int remaining() - vrátí počet elementů mezi position a limit tj. buď počet zbývajících elementů v režimu čtení nebo počet zbývajících volných slotů v režimu zápisu.
Třída ByteBuffer ● Základní konkrétní buffer. Umožňuje: ○ zápis a čtení jednotlivých bytů a dále polí bytů. ○ zápis a čtení primitivních datových typů, ○ vytváření pohledových bufferů. ● Metody pro vytvoření objektu ByteBuffer: ○ static ByteBuffer allocateDirect(int capacity) - vytvoří tzv. přímý buffer. Pro přímý buffer JVM "make a best effort to perform native I/O operations directly upon it" - t.zn bez kopírovacích operací po každé nativní I/O operaci. Přímý buffer může existovat mimo oblast heapu! ○ static ByteBuffer allocate(int capacity) - vytvoří nepřímý ByteBuffer o zadané kapacitě, který je zálohován bytovým polem.
Základní metody pro čtení a zápis ● Metody pro čtení z ByteBuffer. Pokud operaci nelze provést je hozena výjimka BufferUnderflowException: ○ byte get() - relativní čtení. Vrátí byte a inkrementuje position, ○ byte get(int index) - absolutní čtení z indexu, ○ ByteBuffer get(byte[] dst) - relativní blokové čtení. Přesune byty z bufferu do dst. ● Metody pro zápis do ByteBuffer. Pokud operaci nelze provést je hozena výjimka BufferOverflowException: ○ ByteBuffer put(byte b) - relativní zápis bytu, ○ ByteBuffer put(int index, byte b) - absolutní zápis bytu, ○ ByteBuffer put(byte[] src) - relativní zápis pole bytů.
Čtení a zápis jednoduchých typů ● ByteBuffer má metody pro čtení a zápis jednotlivých elemetů jednoduchých typů. Např. pro int: ○ int getInt() - relativní čtení z bufferu. Čte 4 byty z aktuální pozice, vrátí je transformované na hodnotu int. Inkrementuje position o 4, ○ int getInt(int index) - totéž, ale absolutně, ○ ByteBuffer putInt(int value) - relativní zápis na aktuální pozici, position je inkremetováno o 4, ○ ByteBuffer putInt(int index, int value) - totéž, ale absolutně.
Specializovaný pohledový buffer ● U homogenních dat se místo používání typových metod getXXX a putXXX doporučují tzv. pohledové buffery (view buffer), které jsou zálohovány ByteBufferem. Specializovaný pohledový buffer pro typ int se vytvoří metodou: IntBuffer asIntBuffer() ● Pohledové buffery mají následující výhody: ○ jsou indexovány v typovaných elementech a ne v bytech, ○ mají typované put a get metody ○ pokud je zálohový Bytebuffer přímý, jsou také přímé a tudíž efektivní.
Specializované buffery ● Pro každý primitivní typ existují specializované buffery. Např.: public abstract class IntBuffer extends Buffer implements Comparable ● Specializované buffery definuje operace absolutní a relativní get a put operace na jeden element nebo nebo na pole elementů. ● Vytváření specializovaných bufferů: ○ zabalením existujícího pole: static IntBuffer wrap(int[] array) - vytvoří buffer zálohovaný zadaným polem, ○ přímou alokací: static IntBuffer allocate(int capacity) ○ vytvořením pohledu na existující ByteBuffer. ● Specializované buffery mohou být přímé nebo nepřímé.
Kanály ● Kanály jsou reprezentanty entit, které umožňují čtení a zápis dat (obdoba streamů v java.io) jako jsou soubory, hardwarová zařízení, síťové sokety atp.: interface Channel extends Closeable ● Na rozdíl od streamů mohou být kanály duplexní. Po vytvoření je kanál otevřen. Po uzavření jej nelze znovu otevřít a zavolání operace hodí výjimku ClosedChannelException: ○ boolean isOpen() ○ void close() throws IOException ● Kanály jsou thread-safe.
Kanál pro čtení a zápis ● Rozhraní ReadableByteChannel a WritableByteChannel definují kanály pro čtení resp zápis. Na kanálu může probíhat pouze jedna čtecí resp. zápisová operace současně: ○ int read(ByteBuffer dst) throws IOException - čte posloupnost bytů z kanálu do bufferu. Operace se pokusí přečíst až dst.remaining() bytů. Vrací skutečný počet přečtených bytů a bufferu position je o tuto hodnotu inkremetována. V blokovacím módu přečte alespoň jeden byte. ○ int write(ByteBuffer src) throws IOException - pokusí se zapsat do kanálu blok src.remaining() bytů z bufferu. Počet zapsaných bytů závisí na typu kanálu (Např. SocketChannel v neblokovacím módu zapíše pouze tolik bytů, kolik se aktuálně vejde do výstupního bufferu.
Třída FileChannel ● Kanál pro čtení, zápis, mapování a manipulaci se soubory. Kanál se vytváří operací getChannel nad objektem FileInputStream, FileOutputStream resp. RandomAccessFile. ● long position() throws IOException FileChannel position(long newPosition) throws IOException vrací resp. nastavuje pozici kanálu v souboru od které se provádí čtení resp. zápis, ● long size() throws IOException FileChannel truncate(long size) throws IOException - vrátí resp. nastaví velikost souboru, ● void force(boolean metaData) throws IOException - způsobí okamžitý zápis dat do souboru.
Kopírování souborů static void copy(File infile, File outfile) throws IOException { FileInputStream fin = new FileInputStream(infile); FileOutputStream fout = new FileOutputStream(outfile); FileChannel fcin = fin.getChannel(); FileChannel fcout = fout.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); // ByteBuffer buffer = ByteBuffer.allocateDirect(1024); while (true) { buffer.clear(); int r = fcin.read( buffer ); if (r==-1) { break; } buffer.flip(); fcout.write( buffer ); } }
Třída FileChannel II ● MappedByteBuffer map(FileChannel.MapMode mode, long position, long size) throws IOException - namapuje zadaný region souboru přímo do paměti. Chování kanálu je dáno hodnotou parametru mode (MapMode. READ_ONLY, MapMode.READ_WRITE, MapMode. PRIVATE) ● FileLock tryLock(long position, long size, boolean shared) throws IOException - neblokovací pokus získat zámek pro zadaný region souboru, ● FileLock lock(long position, long size, boolean shared) throws IOException - blokovací operace pro získání zámku.
Použití mapování private static int sum(ByteBuffer bb) { int sum = 0; while (bb.hasRemaining()) { if ((sum & 1) != 0) sum = (sum >> 1) + 0x8000; else sum >>= 1; sum += bb.get() & 0xff; sum &= 0xffff; } return sum; } private static int sum(File f) throws IOException { FileInputStream fis = new FileInputStream(f); FileChannel fc = fis.getChannel(); sz = (int)fc.size(); MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, sz); return sum(bb); }
SelectableChannel ● Abstraktní třída SelectableChannel je kanál, který může být testován na připravenost provést nějakou operaci - např. čtení dat. SelectableChannel musí být napřed registrován u objektu Selector: ● final SelectionKey register(Selector sel, int ops) throws ClosedChannelException - registruje kanál u zadaného selektoru a vrací selekční klíč. Parametr ops určuje jednotlivými bity množinu operací, které kanál nabízí (read, write, connect, accept). ● Konkrétní podtřídy jsou DatagramChannel, ServerSocketChannel, SocketChannel.
Třída Selector ● Multiplexor zaregistrovaných kanálů. Vytváří se faktorovou metodou: ○ static Selector open() throws IOException ● Metody: ○ int select(long timeout) throws IOException - blokovací operace detekující kanály, které jsou připravené provést I/O operaci. Vrací nenulový počet připravených kanálů. ○ int selectNow() throws IOException - neblokovací operace se stejnou funkcí, která však může vrátit 0. ○ public abstract Set<SelectionKey> selectedKeys() - vrací množinu klíčů připravených kanálů.
SelectionKey ● Objekty SelectionKey (selekční klíč) reprezentují registraci kanálů u selektoru. Selekční klíče jsou vytvořeny registrační metodou selektoru: register. ● Selekční klíč obsahuje dvě množiny operací: ○ interest_set - operace, na které budou testovány kanály selekční metodou selektoru. Operace jsou zadány při registraci kanálu a mohou být později změněny metodou interestOps(int), ○ ready-set - operace, které je kanál aktuálně připraven vykonat.
NIO Server public class NioServer implements Runnable { private InetAddress hostAddress; private int port; private ServerSocketChannel serverChannel; private Selector selector; private ByteBuffer readBuffer = ByteBuffer.allocate(8192); public NioServer(InetAddress hostAddress, int port) throws IOException { this.hostAddress = hostAddress; this.port = port; this.selector = this.initSelector(hostAddress, port); }
Vytvoření selektoru private Selector initSelector(InetAddress hostAddress, int port;) throws IOException { Selector socketSelector = SelectorProvider.provider().openSelector(); this.serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); serverChannel.socket().bind(isa); serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); return socketSelector; }
Server run public void run() { while (true) { try { int n = this.selector.select(); Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) continue; if (key.isAcceptable()) this.accept(key); } else if (key.isReadable()) { this.read(key); } } catch (Exception e) { e.printStackTrace(); } } }
Akceptace klíče private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); Socket socket = socketChannel.socket(); socketChannel.configureBlocking(false); socketChannel.register(this.selector, SelectionKey.OP_READ); }
Čtení dat private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); this.readBuffer.clear(); int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) // The remote forcibly closed the connection, { socketChannel.close(); key.cancel(); return;} if (numRead == -1) { // Remote entity shut the socket down cleanly. { socketChannel.close(); key.cancel(); return;} this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead); }